In [1]:
# !pip install geopandas rasterio shapely scipy scikit-learn scikit-image seaborn matplotlib_scalebar
In [2]:
import sys
import os
print("Python executable:", sys.executable)
print("Python version:", sys.version)
print("Environment location:", os.path.dirname(sys.executable))
Python executable: C:\Users\colto\Documents\GitHub\saocom_project\.venv\Scripts\python.exe Python version: 3.13.5 (tags/v3.13.5:6cb20a2, Jun 11 2025, 16:15:46) [MSC v.1943 64 bit (AMD64)] Environment location: C:\Users\colto\Documents\GitHub\saocom_project\.venv\Scripts
Setup¶
In [3]:
from unittest.mock import sentinel
# =============================================================================
# IMPORTS
# =============================================================================
# Core libraries
import numpy as np
import pandas as pd
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')
from pyproj import Transformer
# Geospatial libraries
import geopandas as gpd
import rasterio
from rasterio.warp import reproject, Resampling, calculate_default_transform
from rasterio.transform import from_bounds, rowcol
from rasterio.mask import mask
from rasterio import features
from shapely.geometry import Point, box, shape # Add 'shape' here
from rasterio.features import shapes # Add this new line
# Analysis libraries
from scipy.interpolate import griddata
from scipy.ndimage import median_filter, binary_opening, binary_closing
from scipy import stats
from sklearn.neighbors import NearestNeighbors, KernelDensity
from skimage.morphology import disk
from shapely.geometry import shape
from rasterio.features import shapes
import matplotlib.patches as mpatches
# Visualization libraries
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.patches import FancyArrowPatch
from matplotlib.colors import ListedColormap, BoundaryNorm
from matplotlib_scalebar.scalebar import ScaleBar
import seaborn as sns
# Data I/O
from dbfread import DBF
# =============================================================================
# PATHS AND DIRECTORIES
# =============================================================================
DATA_DIR = Path("data")
RESULTS_DIR = Path("results")
RESULTS_DIR.mkdir(exist_ok=True)
print(DATA_DIR)
# =============================================================================
# COORDINATE REFERENCE SYSTEM
# =============================================================================
TARGET_CRS = 'EPSG:32632' # UTM 32N
# =============================================================================
# PROCESSING PARAMETERS
# =============================================================================
COHERENCE_THRESHOLD = 0.3
NODATA = -9999
GRID_SIZE = 10 # meters
# =============================================================================
# CORINE LAND COVER CLASSES
# =============================================================================
CORINE_CLASSES = {
111: 'Continuous urban fabric', 112: 'Discontinuous urban fabric',
121: 'Industrial or commercial units', 122: 'Road and rail networks and associated land',
123: 'Port areas', 124: 'Airports', 131: 'Mineral extraction sites',
132: 'Dump sites', 133: 'Construction sites', 141: 'Green urban areas',
142: 'Sport and leisure facilities', 211: 'Non-irrigated arable land',
212: 'Permanently irrigated land', 213: 'Rice fields', 221: 'Vineyards',
222: 'Fruit trees and berry plantations', 223: 'Olive groves',
231: 'Pastures', 241: 'Annual crops associated with permanent crops',
242: 'Complex cultivation patterns', 243: 'Agriculture/natural vegetation mix', # Note: The description for 21 has been expanded for clarity
244: 'Agro-forestry areas', 311: 'Broad-leaved forest',
312: 'Coniferous forest', 313: 'Mixed forest', 321: 'Natural grasslands',
322: 'Moors and heathland', 323: 'Sclerophyllous vegetation',
324: 'Transitional woodland-shrub', 331: 'Beaches, dunes, sands',
332: 'Bare rocks', 333: 'Sparsely vegetated areas', 334: 'Burnt areas',
335: 'Glaciers and perpetual snow', 411: 'Inland marshes',
412: 'Peat bogs', 421: 'Salt marshes', 422: 'Salines',
423: 'Intertidal flats', 511: 'Water courses', 512: 'Water bodies',
521: 'Coastal lagoons', 522: 'Estuaries', 523: 'Sea and ocean'
}
# CORINE_COLORS = {
# 111: (230, 0, 77), 112: (255, 0, 0), 121: (204, 77, 242), 122: (204, 0, 0),
# 123: (230, 204, 204), 124: (230, 204, 230), 131: (166, 0, 204), 132: (166, 77, 0),
# 133: (255, 77, 255), 141: (255, 166, 255), 142: (255, 230, 255),
# 211: (255, 255, 168), 212: (255, 255, 0), 213: (230, 230, 0), 221: (230, 128, 0),
# 222: (242, 166, 77), 223: (230, 166, 0), 231: (230, 230, 77), 241: (255, 230, 166),
# 242: (255, 230, 77), 243: (230, 204, 77), 244: (242, 204, 166),
# 311: (128, 255, 0), 312: (0, 166, 0), 313: (77, 255, 0), 321: (204, 242, 77),
# 322: (166, 255, 128), # Note: Corrected color for 322 (Moors and heathland) as per standard CLC. Your original color for key 27, (166, 230, 77), corresponds to 323 (Sclerophyllous vegetation)
# 323: (166, 230, 77),
# 324: (166, 242, 0), # Note: Corrected color for 324 (Transitional woodland-shrub) as per standard CLC. Your original color for key 29, (0, 204, 0), does not match standard CLC color for this category.
# 331: (230, 230, 230), # Note: Corrected color for 331 (Beaches, dunes, sands) as per standard CLC. Your original color for key 30, (240, 240, 240), is slightly different.
# 332: (204, 204, 204), 333: (204, 255, 204), 334: (0, 0, 0), 335: (166, 230, 204),
# 411: (166, 166, 255), 412: (77, 77, 255), 421: (204, 204, 255), 422: (230, 230, 255),
# 423: (166, 166, 230), 511: (0, 204, 242), 512: (128, 242, 230), 521: (0, 255, 166),
# 522: (166, 255, 230), 523: (230, 242, 255) # Note: Corrected color for 523 (Sea and ocean) as per standard CLC. Your original color for key 44 was (230, 242, 255) which is the correct one.
# }
#
# # Convert RGB (0-255) to matplotlib format (0-1)
# CORINE_COLORS_MPL = {k: (r/255, g/255, b/255) for k, (r, g, b) in CORINE_COLORS.items()}
# Colorblind-friendly version (paste this AFTER the original CORINE_COLORS)
CORINE_COLORS_COLORBLIND = {
# ARTIFICIAL SURFACES (1xx) - Purples/Magentas/Grays
111: (102, 0, 102), # Dark purple - Continuous urban
112: (153, 51, 153), # Medium purple - Discontinuous urban
121: (204, 102, 204), # Light purple - Industrial
122: (80, 80, 80), # Dark gray - Roads/rail
123: (120, 120, 120), # Medium gray - Ports
124: (160, 160, 160), # Light gray - Airports
131: (255, 0, 255), # Bright magenta - Mineral extraction
132: (178, 34, 34), # Dark red-brown - Dump sites
133: (255, 150, 180), # Darker pink - Construction (was too light)
141: (120, 200, 120), # Medium green - Green urban areas (darkened)
142: (100, 180, 100), # Green - Sport/leisure (darkened)
# AGRICULTURAL (2xx) - Yellows/Oranges/Browns
211: (230, 230, 50), # Strong yellow - Non-irrigated arable (darkened)
212: (235, 200, 0), # Gold yellow - Permanently irrigated (darkened)
213: (220, 180, 0), # Dark gold - Rice fields
221: (255, 140, 0), # Dark orange - Vineyards
222: (255, 165, 79), # Orange - Fruit trees
223: (204, 153, 0), # Olive-brown - Olive groves
231: (210, 210, 80), # Medium yellow - Pastures (MUCH darker)
241: (200, 170, 100), # Tan - Annual crops w/ permanent (darkened)
242: (210, 160, 70), # Brown - Complex cultivation (darkened)
243: (190, 150, 80), # Medium brown - Agriculture w/ natural
244: (179, 143, 0), # Dark yellow-brown - Agro-forestry
# FORESTS & SEMI-NATURAL (3xx) - Teals/Cyans/Dark colors
311: (0, 153, 102), # Dark teal - Broad-leaved forest
312: (0, 102, 76), # Very dark teal - Coniferous forest
313: (0, 128, 128), # Medium teal - Mixed forest
321: (150, 220, 150), # Light green - Natural grasslands (darkened)
322: (102, 204, 153), # Mint - Moors and heathland
323: (130, 180, 130), # Sage - Sclerophyllous vegetation (darkened)
324: (51, 153, 102), # Medium green - Transitional woodland
331: (210, 180, 140), # Tan - Beaches/dunes/sands (MUCH darker)
332: (140, 140, 140), # Gray - Bare rocks (darkened)
333: (170, 170, 120), # Khaki - Sparsely vegetated (darkened)
334: (40, 40, 40), # Near black - Burnt areas
335: (180, 210, 230), # Light blue - Glaciers/snow (darkened)
# WETLANDS (4xx) - Light blues/cyans
411: (120, 170, 230), # Sky blue - Inland marshes (darkened)
412: (80, 140, 220), # Medium blue - Peat bogs (darkened)
421: (150, 190, 240), # Light blue - Salt marshes (darkened)
422: (140, 170, 210), # Powder blue - Salines (darkened)
423: (100, 160, 210), # Cyan - Intertidal flats (darkened)
# WATER BODIES (5xx) - Dark blues
511: (0, 102, 204), # Dark blue - Water courses
512: (0, 76, 153), # Very dark blue - Water bodies
521: (51, 102, 153), # Medium dark blue - Coastal lagoons
522: (0, 51, 102), # Navy - Estuaries
523: (0, 25, 76) # Very dark navy - Sea and ocean
}
# Use the colorblind-friendly version
CORINE_COLORS = CORINE_COLORS_COLORBLIND
CORINE_COLORS_MPL = {k: (r/255, g/255, b/255) for k, (r, g, b) in CORINE_COLORS.items()}
# =============================================================================
# FILE DISCOVERY
# =============================================================================
# Automatically locate data files in subdirectories
saocom_files = list((DATA_DIR / "saocom_csv").glob("*.csv"))
tinitaly_files = list((DATA_DIR / "tinitaly").glob("*.tif"))
copernicus_files = list((DATA_DIR / "copernicus").glob("*.tif"))
corine_files = list((DATA_DIR / "ground_cover").glob("*.tif"))
sentinel_files = list((DATA_DIR / "sentinel_data").glob("*.tif"))
print(corine_files, saocom_files, tinitaly_files, copernicus_files, sentinel_files)
# Select first match for each dataset
saocom_path = saocom_files[0] if saocom_files else None
tinitaly_path = tinitaly_files[0] if tinitaly_files else None
copernicus_path = copernicus_files[0] if copernicus_files else None
corine_path = corine_files[0] if corine_files else None
sentinel_path = sentinel_files[0] if sentinel_files else None
# Find the corresponding .vat.dbf file for the CORINE raster
corine_dbf_path = None
if corine_path:
corine_dbf_candidates = list((DATA_DIR / "ground_cover").glob(f"{corine_path.name}.vat.dbf"))
corine_dbf_path = corine_dbf_candidates[0] if corine_dbf_candidates else None
data
[WindowsPath('data/ground_cover/land_cover_clipped.tif')] [WindowsPath('data/saocom_csv/verona_fullGraph_weighted_Tcoh07_edited.csv')] [WindowsPath('data/tinitaly/tinitaly_crop.tif')] [WindowsPath('data/copernicus/GLO30.tif')] [WindowsPath('data/sentinel_data/Sentinel2Views_Clip.tif')]
Load Data¶
In [4]:
# =============================================================================
# LOAD SAOCOM POINT DATA
# =============================================================================
# Read CSV and standardize columns
df = pd.read_csv(saocom_path, sep=',')
df.columns = ['ID', 'SVET', 'LVET', 'LAT', 'LAT2', 'LON', 'LON2', 'HEIGHT', 'HEIGHT_WRT_DEM', 'SIGMA_HEIGHT', 'COHER']
# Convert to numeric and remove invalid points
for col in ['LAT', 'LON','LAT2', 'LON2', 'HEIGHT', 'COHER']:
df[col] = pd.to_numeric(df[col], errors='coerce')
df = df.dropna(subset=['LAT', 'LON','LAT2', 'LON2', 'HEIGHT', 'COHER'])
df = df[(df['LAT2'] != 0) & (df['LON2'] != 0)]
df.rename(columns={
'LAT': 'LAT_old',
'LON': 'LON_old',
'LAT2': 'LAT',
'LON2': 'LON'
}, inplace=True)
# Apply coherence filter
df_filtered = df[df['COHER'] >= COHERENCE_THRESHOLD]
# Convert to GeoDataFrame and reproject to target CRS
geometry = [Point(lon, lat) for lon, lat in zip(df_filtered['LON'], df_filtered['LAT'])]
saocom_gdf = gpd.GeoDataFrame(df_filtered, geometry=geometry, crs='EPSG:4326')
saocom_gdf = saocom_gdf.to_crs(TARGET_CRS)
saocom_gdf['x_utm'] = saocom_gdf.geometry.x
saocom_gdf['y_utm'] = saocom_gdf.geometry.y
# =============================================================================
# LOAD REFERENCE DEMS
# =============================================================================
# TINITALY
with rasterio.open(tinitaly_path) as src:
tinitaly_crs = src.crs
tinitaly_res = src.res
tinitaly_bounds = src.bounds
tinitaly_nodata = src.nodata
# Copernicus
with rasterio.open(copernicus_path) as src:
copernicus_crs = src.crs
copernicus_res = src.res
copernicus_bounds = src.bounds
copernicus_nodata = src.nodata
# # =============================================================================
# # LOAD AND REMAP CORINE LAND COVER
# # =============================================================================
# # Load DBF lookup table
# dbf_table = DBF(corine_dbf_path, load=True)
# lookup_df = pd.DataFrame(iter(dbf_table))
#
# # Create mapping dictionaries
# value_to_code = dict(zip(lookup_df['Value'], lookup_df['CODE_18']))
# value_to_label = dict(zip(lookup_df['Value'], lookup_df['LABEL3']))
# code_to_label = dict(zip(lookup_df['CODE_18'], lookup_df['LABEL3']))
#
# # Load original CORINE raster
# with rasterio.open(corine_path) as src:
# corine_raw = src.read(1)
# corine_crs = src.crs
# corine_res = src.res
# corine_bounds = src.bounds
# corine_nodata = src.nodata if src.nodata is not None else 255
# corine_transform = src.transform
# corine_profile = src.profile
#
# # Remap raster values from Value to CODE_18
# corine_remapped = np.full_like(corine_raw, 0, dtype=np.uint16)
# for value, code in value_to_code.items():
# corine_remapped[corine_raw == value] = code
# corine_remapped[corine_raw == corine_nodata] = 0 # NoData = 0
#
# # Save remapped CORINE to temporary file
# corine_remapped_path = RESULTS_DIR / "corine_remapped.tif"
# profile_remapped = corine_profile.copy()
# profile_remapped.update(dtype='uint16', nodata=0)
# with rasterio.open(corine_remapped_path, 'w', **profile_remapped) as dst:
# dst.write(corine_remapped, 1)
#
# # Update corine_path to use remapped version
# corine_path = corine_remapped_path
HORIZONTAL DATUM VERIFICATION¶
In [5]:
from sklearn.neighbors import NearestNeighbors
def remove_isolated_knn(gdf, k=5, distance_threshold=100):
"""Remove points far from k nearest neighbors."""
coords = np.array([[p.x, p.y] for p in gdf.geometry])
nbrs = NearestNeighbors(n_neighbors=k+1).fit(coords)
distances, _ = nbrs.kneighbors(coords)
# Average distance to k nearest neighbors (exclude self at index 0)
avg_distances = distances[:, 1:].mean(axis=1)
mask = avg_distances < distance_threshold
return gdf[mask].reset_index(drop=True)
# =============================================================================
# HORIZONTAL DATUM VERIFICATION
# =============================================================================
# Check if datasets need reprojection to target CRS
tinitaly_needs_reproject = str(tinitaly_crs) != TARGET_CRS
copernicus_needs_reproject = str(copernicus_crs) != TARGET_CRS
# corine_needs_reproject = str(corine_crs) != TARGET_CRS
# =============================================================================
# VERTICAL DATUM VERIFICATION
# =============================================================================
# Extract vertical datum information from CRS WKT
tinitaly_wkt = tinitaly_crs.to_wkt()
copernicus_wkt = copernicus_crs.to_wkt()
# Check for vertical datum identifiers
tinitaly_vertical = 'EGM2008' in tinitaly_wkt or 'geoid' in tinitaly_wkt.lower()
copernicus_vertical = 'EGM2008' in copernicus_wkt or 'geoid' in copernicus_wkt.lower()
# Copernicus GLO-30 uses EGM2008 geoid (documented)
# TINITALY typically uses WGS84 ellipsoid (documented)
# =============================================================================
# CREATE STUDY AREA BOUNDS AND CONVEX HULL
# =============================================================================
study_bounds = saocom_gdf.total_bounds # [xmin, ymin, xmax, ymax]
study_area_poly = box(*study_bounds)
study_area_gdf = gpd.GeoDataFrame([1], geometry=[study_area_poly], crs=TARGET_CRS)
saocom_gdf = remove_isolated_knn(saocom_gdf, k=5, distance_threshold=100)
# Create convex hull from SAOCOM points for masking
data_hull = saocom_gdf.unary_union.convex_hull
hull_gdf = gpd.GeoDataFrame(geometry=[data_hull], crs=TARGET_CRS)
# =============================================================================
# DEFINE 10M GRID PARAMETERS
# =============================================================================
xmin_grid = np.floor(study_bounds[0] / GRID_SIZE) * GRID_SIZE
ymin_grid = np.floor(study_bounds[1] / GRID_SIZE) * GRID_SIZE
xmax_grid = np.ceil(study_bounds[2] / GRID_SIZE) * GRID_SIZE
ymax_grid = np.ceil(study_bounds[3] / GRID_SIZE) * GRID_SIZE
grid_width = int((xmax_grid - xmin_grid) / GRID_SIZE)
grid_height = int((ymax_grid - ymin_grid) / GRID_SIZE)
target_transform = from_bounds(xmin_grid, ymin_grid, xmax_grid, ymax_grid, grid_width, grid_height)
# =============================================================================
# STORE REFERENCE DATASET METADATA
# =============================================================================
reference_dems = {
'tinitaly_crop': {
'path': tinitaly_path,
'crs': tinitaly_crs,
'needs_reproject': tinitaly_needs_reproject,
'vertical_datum': 'WGS84 ellipsoid'
},
'copernicus': {
'path': copernicus_path,
'crs': copernicus_crs,
'needs_reproject': copernicus_needs_reproject,
'vertical_datum': 'EGM2008 geoid'
}
}
RESAMPLE TO 10M¶
In [6]:
# =============================================================================
# RESAMPLE TINITALY TO 10M
# =============================================================================
tinitaly_10m = np.full((grid_height, grid_width), NODATA, dtype=np.float32)
with rasterio.open(tinitaly_path) as src:
reproject(
source=rasterio.band(src, 1),
destination=tinitaly_10m,
src_transform=src.transform,
src_crs=src.crs,
dst_transform=target_transform,
dst_crs=TARGET_CRS,
resampling=Resampling.cubic,
src_nodata=src.nodata,
dst_nodata=NODATA
)
# Save resampled TINITALY
tinitaly_10m_path = RESULTS_DIR / "tinitaly_10m.tif"
profile = {
'driver': 'GTiff', 'dtype': 'float32', 'width': grid_width, 'height': grid_height,
'count': 1, 'crs': TARGET_CRS, 'transform': target_transform,
'nodata': NODATA, 'compress': 'lzw'
}
with rasterio.open(tinitaly_10m_path, 'w', **profile) as dst:
dst.write(tinitaly_10m, 1)
# =============================================================================
# RESAMPLE COPERNICUS TO 10M
# =============================================================================
copernicus_10m = np.full((grid_height, grid_width), NODATA, dtype=np.float32)
with rasterio.open(copernicus_path) as src:
reproject(
source=rasterio.band(src, 1),
destination=copernicus_10m,
src_transform=src.transform,
src_crs=src.crs,
dst_transform=target_transform,
dst_crs=TARGET_CRS,
resampling=Resampling.cubic,
src_nodata=src.nodata,
dst_nodata=NODATA
)
# Save resampled Copernicus
copernicus_10m_path = RESULTS_DIR / "copernicus_10m.tif"
with rasterio.open(copernicus_10m_path, 'w', **profile) as dst:
dst.write(copernicus_10m, 1)
# =============================================================================
# UPDATE REFERENCE DATASET PATHS
# =============================================================================
reference_dems['tinitaly_crop']['resampled_path'] = tinitaly_10m_path
reference_dems['tinitaly_crop']['is_10m'] = True
reference_dems['copernicus']['resampled_path'] = copernicus_10m_path
reference_dems['copernicus']['is_10m'] = True
CREATE RASTERIZED MASK FROM SAOCOM CONVEX HULL¶
In [7]:
# =============================================================================
# CREATE RASTERIZED MASK FROM SAOCOM CONVEX HULL
# =============================================================================
# Rasterize the convex hull polygon to match the 10m grid
hull_mask = features.rasterize(
shapes=[data_hull],
out_shape=(grid_height, grid_width),
transform=target_transform,
fill=0,
all_touched=True,
dtype=np.uint8
) == 1 # Convert to boolean (True inside hull)
# =============================================================================
# MASK TINITALY
# =============================================================================
tinitaly_10m_masked = tinitaly_10m.copy()
tinitaly_10m_masked[~hull_mask] = NODATA
# Save masked TINITALY
tinitaly_masked_path = RESULTS_DIR / "tinitaly_10m_masked.tif"
with rasterio.open(tinitaly_masked_path, 'w', **profile) as dst:
dst.write(tinitaly_10m_masked, 1)
# =============================================================================
# MASK COPERNICUS
# =============================================================================
copernicus_10m_masked = copernicus_10m.copy()
copernicus_10m_masked[~hull_mask] = NODATA
# Save masked Copernicus
copernicus_masked_path = RESULTS_DIR / "copernicus_10m_masked.tif"
with rasterio.open(copernicus_masked_path, 'w', **profile) as dst:
dst.write(copernicus_10m_masked, 1)
# =============================================================================
# UPDATE REFERENCE DATASET PATHS TO MASKED VERSIONS
# =============================================================================
reference_dems['tinitaly_crop']['masked_path'] = tinitaly_masked_path
reference_dems['copernicus']['masked_path'] = copernicus_masked_path
# Store masked arrays in memory for quick access
tinitaly_10m = tinitaly_10m_masked
copernicus_10m = copernicus_10m_masked
# corine_10m = corine_10m_masked
SAMPLE REFERENCE DEMS AT SAOCOM LOCATIONS¶
In [8]:
# =============================================================================
# SAMPLE REFERENCE DEMS AT SAOCOM LOCATIONS
# =============================================================================
# Sample TINITALY at each SAOCOM point
tinitaly_heights = []
for idx, row in saocom_gdf.iterrows():
row_idx, col_idx = rowcol(target_transform, row.geometry.x, row.geometry.y)
if 0 <= row_idx < grid_height and 0 <= col_idx < grid_width:
height = tinitaly_10m[row_idx, col_idx]
tinitaly_heights.append(height if height != NODATA else np.nan)
else:
tinitaly_heights.append(np.nan)
saocom_gdf['tinitaly_height'] = tinitaly_heights
# Sample Copernicus at each SAOCOM point
copernicus_heights = []
for idx, row in saocom_gdf.iterrows():
row_idx, col_idx = rowcol(target_transform, row.geometry.x, row.geometry.y)
if 0 <= row_idx < grid_height and 0 <= col_idx < grid_width:
height = copernicus_10m[row_idx, col_idx]
copernicus_heights.append(height if height != NODATA else np.nan)
else:
copernicus_heights.append(np.nan)
saocom_gdf['copernicus_height'] = copernicus_heights
# Rename HEIGHT column for clarity
saocom_gdf['HEIGHT_RELATIVE'] = saocom_gdf['HEIGHT']
# =============================================================================
# CALIBRATE SAOCOM TO TINITALY (Method 1: Constant Offset)
# =============================================================================
# Filter to stable points: high coherence, valid reference data
stable_mask_tin = (
(saocom_gdf['COHER'] >= 0.8) &
(saocom_gdf['tinitaly_height'].notna()) &
(saocom_gdf['HEIGHT_RELATIVE'].notna()) &
(np.abs(saocom_gdf['HEIGHT_RELATIVE']) < 1000) # Exclude extreme outliers
)
stable_points_tin = saocom_gdf[stable_mask_tin].copy()
# Calculate offset: difference between reference DEM and SAOCOM relative heights
height_diff_tin = stable_points_tin['tinitaly_height'] - stable_points_tin['HEIGHT_RELATIVE']
offset_tinitaly = np.median(height_diff_tin) # Median for robustness
print(f"\nTINITALY Calibration:")
print(f" Stable points used: {len(stable_points_tin):,}")
print(f" Constant offset: {offset_tinitaly:.3f} m")
print(f" Offset std dev: {np.std(height_diff_tin):.3f} m")
# Apply correction to all SAOCOM points
saocom_gdf['HEIGHT_ABSOLUTE_TIN'] = saocom_gdf['HEIGHT_RELATIVE'] + offset_tinitaly
# =============================================================================
# CALIBRATE SAOCOM TO COPERNICUS (Method 1: Constant Offset)
# =============================================================================
stable_mask_cop = (
(saocom_gdf['COHER'] >= 0.8) &
(saocom_gdf['copernicus_height'].notna()) &
(saocom_gdf['HEIGHT_RELATIVE'].notna()) &
(np.abs(saocom_gdf['HEIGHT_RELATIVE']) < 1000)
)
stable_points_cop = saocom_gdf[stable_mask_cop].copy()
height_diff_cop = stable_points_cop['copernicus_height'] - stable_points_cop['HEIGHT_RELATIVE']
offset_copernicus = np.median(height_diff_cop)
print(f"\nCOPERNICUS Calibration:")
print(f" Stable points used: {len(stable_points_cop):,}")
print(f" Constant offset: {offset_copernicus:.3f} m")
print(f" Offset std dev: {np.std(height_diff_cop):.3f} m")
saocom_gdf['HEIGHT_ABSOLUTE_COP'] = saocom_gdf['HEIGHT_RELATIVE'] + offset_copernicus
# =============================================================================
# VALIDATION: CALCULATE RMSE AT STABLE POINTS
# =============================================================================
# TINITALY validation
residuals_tin = stable_points_tin['tinitaly_height'] - (stable_points_tin['HEIGHT_RELATIVE'] + offset_tinitaly)
rmse_tin = np.sqrt(np.mean(residuals_tin**2))
# Copernicus validation
residuals_cop = stable_points_cop['copernicus_height'] - (stable_points_cop['HEIGHT_RELATIVE'] + offset_copernicus)
rmse_cop = np.sqrt(np.mean(residuals_cop**2))
print(f"\nValidation Results:")
print(f" TINITALY RMSE: {rmse_tin:.3f} m")
print(f" Copernicus RMSE: {rmse_cop:.3f} m")
print(f"\nRecommendation: Use HEIGHT_ABSOLUTE_TIN (lower RMSE expected)")
TINITALY Calibration: Stable points used: 46,920 Constant offset: 4.308 m Offset std dev: 4.918 m COPERNICUS Calibration: Stable points used: 46,939 Constant offset: 4.761 m Offset std dev: 4.160 m Validation Results: TINITALY RMSE: 4.955 m Copernicus RMSE: 4.160 m Recommendation: Use HEIGHT_ABSOLUTE_TIN (lower RMSE expected)
CREATE SAOCOM COVERAGE GRID¶
In [9]:
# =============================================================================
# CREATE SAOCOM COVERAGE GRID
# =============================================================================
# Initialize coverage array matching the 10m grid
saocom_coverage = np.zeros((grid_height, grid_width), dtype=bool)
# Convert SAOCOM points to grid indices
saocom_rows, saocom_cols = rowcol(target_transform,
saocom_gdf.geometry.x.values,
saocom_gdf.geometry.y.values)
# Mark cells with SAOCOM data
valid_indices = ((saocom_rows >= 0) & (saocom_rows < grid_height) &
(saocom_cols >= 0) & (saocom_cols < grid_width))
saocom_coverage[saocom_rows[valid_indices], saocom_cols[valid_indices]] = True
# =============================================================================
# CALCULATE OVERALL VOID STATISTICS
# =============================================================================
# Study area mask (inside hull, excluding nodata)
study_area_mask = hull_mask
# Void mask (study area cells without SAOCOM data)
void_mask = study_area_mask & (~saocom_coverage)
n_total_cells = np.sum(study_area_mask)
n_occupied_cells = np.sum(study_area_mask & saocom_coverage)
n_void_cells = np.sum(void_mask)
void_percentage = 100 * n_void_cells / n_total_cells if n_total_cells > 0 else 0
print(void_percentage)
# =============================================================================
# SAVE VOID MASK AS RASTER
# =============================================================================
void_mask_path = RESULTS_DIR / "saocom_void_mask.tif"
profile_void = profile.copy()
profile_void['dtype'] = 'uint8'
profile_void['nodata'] = 255
void_raster = np.full((grid_height, grid_width), 255, dtype=np.uint8)
void_raster[study_area_mask] = 0 # Data area
void_raster[void_mask] = 1 # Void cells
with rasterio.open(void_mask_path, 'w', **profile_void) as dst:
dst.write(void_raster, 1)
87.0403491489293
LOAD REFERENCE DEM DATA (Already in memory from Cell 4)¶
In [10]:
# =============================================================================
# LOAD REFERENCE DEM DATA (Already in memory from Cell 4)
# =============================================================================
tinitaly_data = tinitaly_10m.copy()
copernicus_data = copernicus_10m.copy()
# =============================================================================
# CALCULATE ELEVATION DIFFERENCE (TINITALY - COPERNICUS)
# =============================================================================
elevation_diff = tinitaly_data - copernicus_data
# =============================================================================
# CREATE VALID COMPARISON MASK
# =============================================================================
valid_mask = (tinitaly_data != NODATA) & (copernicus_data != NODATA)
# Extract valid data for statistics
# valid_pixels = np.sum(valid_mask)
valid_pixels = int(np.sum(valid_mask))
valid_diffs = elevation_diff[valid_mask]
valid_tinitaly = tinitaly_data[valid_mask]
valid_copernicus = copernicus_data[valid_mask]
print(valid_diffs)
# =============================================================================
# CALCULATE REFERENCE COMPARISON STATISTICS
# =============================================================================
ref_metrics = {
'n_pixels': int(valid_pixels),
'mean_diff': float(np.mean(valid_diffs)),
'median_diff': float(np.median(valid_diffs)),
'std_diff': float(np.std(valid_diffs)),
'rmse': float(np.sqrt(np.mean(valid_diffs**2))),
'mae': float(np.mean(np.abs(valid_diffs))),
'nmad': float(1.4826 * np.median(np.abs(valid_diffs - np.median(valid_diffs)))),
'min_diff': float(np.min(valid_diffs)),
'max_diff': float(np.max(valid_diffs)),
'correlation': float(np.corrcoef(valid_tinitaly, valid_copernicus)[0, 1])
}
# =============================================================================
# DEFINE EQUALITY TOLERANCE USING NMAD
# =============================================================================
# Use NMAD as statistical threshold for "roughly equal"
equal_tolerance = ref_metrics['nmad']
print(equal_tolerance)
# =============================================================================
# DIRECTIONAL COMPARISON GRIDS WITH EQUALITY BUFFER
# =============================================================================
# Where TINITALY significantly > Copernicus
tinitaly_higher_mask = (valid_mask) & (elevation_diff > equal_tolerance)
tinitaly_higher_data = np.full_like(elevation_diff, np.nan)
tinitaly_higher_data[tinitaly_higher_mask] = elevation_diff[tinitaly_higher_mask]
# Where TINITALY significantly < Copernicus
tinitaly_lower_mask = (valid_mask) & (elevation_diff < -equal_tolerance)
tinitaly_lower_data = np.full_like(elevation_diff, np.nan)
tinitaly_lower_data[tinitaly_lower_mask] = elevation_diff[tinitaly_lower_mask]
# Where TINITALY ≈ Copernicus (within tolerance)
roughly_equal_mask = (valid_mask) & (np.abs(elevation_diff) <= equal_tolerance)
roughly_equal_data = np.full_like(elevation_diff, np.nan)
roughly_equal_data[roughly_equal_mask] = elevation_diff[roughly_equal_mask]
# Pixel counts and percentages
higher_pixels = int(np.sum(tinitaly_higher_mask))
lower_pixels = int(np.sum(tinitaly_lower_mask))
equal_pixels = int(np.sum(roughly_equal_mask))
pct_higher = float(100 * higher_pixels / valid_pixels) if valid_pixels > 0 else 0.0
pct_lower = float(100 * lower_pixels / valid_pixels) if valid_pixels > 0 else 0.0
pct_equal = float(100 * equal_pixels / valid_pixels) if valid_pixels > 0 else 0.0
# =============================================================================
# HEIGHT STATISTICS COMPARISON
# =============================================================================
def calculate_height_stats(data, name):
"""Calculate comprehensive height statistics"""
valid_data = data[~np.isnan(data)]
if len(valid_data) == 0:
return None
stats = {
'Dataset': name,
'Count': len(valid_data),
'Min': np.min(valid_data),
'Max': np.max(valid_data),
'Mean': np.mean(valid_data),
'Median': np.median(valid_data),
'Std Dev': np.std(valid_data),
'Range': np.max(valid_data) - np.min(valid_data),
'Q25': np.percentile(valid_data, 25),
'Q75': np.percentile(valid_data, 75),
'IQR': np.percentile(valid_data, 75) - np.percentile(valid_data, 25)
}
return stats
# Collect statistics
stats_list = []
# SAOCOM relative heights
stats_list.append(calculate_height_stats(
saocom_gdf['HEIGHT_RELATIVE'].values,
'SAOCOM (Relative)'
))
# TINITALY sampled at SAOCOM points
stats_list.append(calculate_height_stats(
saocom_gdf['tinitaly_height'].values,
'TINITALY (at SAOCOM pts)'
))
# Copernicus sampled at SAOCOM points
stats_list.append(calculate_height_stats(
saocom_gdf['copernicus_height'].values,
'Copernicus (at SAOCOM pts)'
))
# TINITALY full raster (within study area)
tinitaly_valid = tinitaly_10m[tinitaly_10m != NODATA]
stats_list.append(calculate_height_stats(
tinitaly_valid,
'TINITALY (Full Grid)'
))
# Copernicus full raster (within study area)
copernicus_valid = copernicus_10m[copernicus_10m != NODATA]
stats_list.append(calculate_height_stats(
copernicus_valid,
'Copernicus (Full Grid)'
))
# Create DataFrame
stats_df = pd.DataFrame(stats_list)
# Display with formatting
print("\n" + "="*90)
print("HEIGHT STATISTICS SUMMARY (all values in meters)")
print("="*90)
print(stats_df.to_string(index=False, float_format=lambda x: f'{x:.2f}'))
print("="*90)
# Additional comparison: SAOCOM vs Reference DEMs
print("\nDIFFERENCE STATISTICS (SAOCOM Relative - Reference DEM):")
print("-"*90)
# SAOCOM - TINITALY
diff_tin = saocom_gdf['HEIGHT_RELATIVE'] - saocom_gdf['tinitaly_height']
diff_tin_valid = diff_tin.dropna()
print(f"\nSAOCOM - TINITALY:")
print(f" Mean difference: {diff_tin_valid.mean():+.3f} m")
print(f" Median difference: {diff_tin_valid.median():+.3f} m")
print(f" Std deviation: {diff_tin_valid.std():.3f} m")
print(f" RMSE: {np.sqrt((diff_tin_valid**2).mean()):.3f} m")
# SAOCOM - Copernicus
diff_cop = saocom_gdf['HEIGHT_RELATIVE'] - saocom_gdf['copernicus_height']
diff_cop_valid = diff_cop.dropna()
print(f"\nSAOCOM - Copernicus:")
print(f" Mean difference: {diff_cop_valid.mean():+.3f} m")
print(f" Median difference: {diff_cop_valid.median():+.3f} m")
print(f" Std deviation: {diff_cop_valid.std():.3f} m")
print(f" RMSE: {np.sqrt((diff_cop_valid**2).mean()):.3f} m")
# TINITALY - Copernicus (reference comparison)
ref_diff = (tinitaly_10m[valid_mask] - copernicus_10m[valid_mask])
ref_diff_valid = ref_diff[~np.isnan(ref_diff)]
print(f"\nTINITALY - Copernicus (Reference Check):")
print(f" Mean difference: {ref_diff_valid.mean():+.3f} m")
print(f" Median difference: {np.median(ref_diff_valid):+.3f} m")
print(f" Std deviation: {ref_diff_valid.std():.3f} m")
print(f" RMSE: {np.sqrt((ref_diff_valid**2).mean()):.3f} m")
[-14.70282 -11.711487 -9.1297 ... -0.7229309 -1.0675964
-1.7591248]
2.1790504455566406
==========================================================================================
HEIGHT STATISTICS SUMMARY (all values in meters)
==========================================================================================
Dataset Count Min Max Mean Median Std Dev Range Q25 Q75 IQR
SAOCOM (Relative) 66765 -562.00 1163.70 340.09 327.00 116.87 1725.70 252.00 418.80 166.80
TINITALY (at SAOCOM pts) 66690 99.32 825.80 343.79 330.04 116.54 726.48 255.95 421.12 165.17
Copernicus (at SAOCOM pts) 66765 100.09 826.59 345.34 331.56 116.86 726.50 256.84 423.97 167.13
TINITALY (Full Grid) 493124 98.84 826.76 354.23 341.34 129.15 727.92 256.91 442.43 185.52
Copernicus (Full Grid) 495376 99.60 827.02 356.89 343.88 129.64 727.42 258.68 446.53 187.85
==========================================================================================
DIFFERENCE STATISTICS (SAOCOM Relative - Reference DEM):
------------------------------------------------------------------------------------------
SAOCOM - TINITALY:
Mean difference: -3.911 m
Median difference: -4.383 m
Std deviation: 14.745 m
RMSE: 15.254 m
SAOCOM - Copernicus:
Mean difference: -5.251 m
Median difference: -4.939 m
Std deviation: 14.473 m
RMSE: 15.397 m
TINITALY - Copernicus (Reference Check):
Mean difference: -2.029 m
Median difference: -0.585 m
Std deviation: 4.222 m
RMSE: 4.684 m
LOAD DBF LOOKUP TABLE¶
In [11]:
# -----------------------------------------------------------------------------
# 3. LOAD DBF LOOKUP TABLE
# -----------------------------------------------------------------------------
dbf_table = DBF(corine_dbf_path, load=True)
lookup_df = pd.DataFrame(iter(dbf_table))
# Create mapping dictionaries
value_to_code = dict(zip(lookup_df['Value'], lookup_df['CODE_18']))
value_to_label = dict(zip(lookup_df['Value'], lookup_df['LABEL3']))
code_to_label = dict(zip(lookup_df['CODE_18'], lookup_df['LABEL3']))
# -----------------------------------------------------------------------------
# 4. LOAD, MASK, AND REMAP CORINE (OPTIMIZED)
# -----------------------------------------------------------------------------
with rasterio.open(corine_path) as src:
# Reproject hull to match CORINE CRS
hull_corine_crs = hull_gdf.to_crs(src.crs)
# Crop to study area using convex hull FIRST (faster processing)
corine_raw, crop_transform = mask(src, hull_corine_crs.geometry, crop=True, filled=False)
corine_raw = corine_raw[0] # Extract from (1, h, w) to (h, w)
corine_crs = src.crs
corine_res = src.res
corine_nodata = src.nodata if src.nodata is not None else 255
corine_bounds = src.bounds
# Remap values: Value → CODE_18 (on cropped array)
corine_remapped = np.full_like(corine_raw, 0, dtype=np.uint16)
for value, code in value_to_code.items():
corine_remapped[corine_raw == value] = code
corine_remapped[corine_raw == corine_nodata] = 0 # NoData = 0
# Save intermediate remapped version
corine_remapped_path = RESULTS_DIR / "corine_remapped_cropped.tif"
profile_remapped = {
'driver': 'GTiff', 'dtype': 'uint16',
'width': corine_remapped.shape[1], 'height': corine_remapped.shape[0],
'count': 1, 'crs': corine_crs, 'transform': crop_transform,
'nodata': 0, 'compress': 'lzw'
}
with rasterio.open(corine_remapped_path, 'w', **profile_remapped) as dst:
dst.write(corine_remapped, 1)
# -----------------------------------------------------------------------------
# 5. RESAMPLE TO 10M GRID
# -----------------------------------------------------------------------------
corine_10m = np.full((grid_height, grid_width), 0, dtype=np.uint16)
with rasterio.open(corine_remapped_path) as src:
reproject(
source=rasterio.band(src, 1),
destination=corine_10m,
src_transform=src.transform,
src_crs=src.crs,
dst_transform=target_transform,
dst_crs=TARGET_CRS,
resampling=Resampling.nearest, # Use nearest for categorical data
src_nodata=0,
dst_nodata=0
)
# Save resampled version
corine_10m_path = RESULTS_DIR / "corine_10m.tif"
profile_10m = {
'driver': 'GTiff', 'dtype': 'uint16',
'width': grid_width, 'height': grid_height,
'count': 1, 'crs': TARGET_CRS, 'transform': target_transform,
'nodata': 0, 'compress': 'lzw'
}
with rasterio.open(corine_10m_path, 'w', **profile_10m) as dst:
dst.write(corine_10m, 1)
# -----------------------------------------------------------------------------
# 6. MASK TO HULL
# -----------------------------------------------------------------------------
corine_10m_masked = corine_10m.copy()
corine_10m_masked[~hull_mask] = 0
# Save final masked version
corine_masked_path = RESULTS_DIR / "corine_10m_masked.tif"
with rasterio.open(corine_masked_path, 'w', **profile_10m) as dst:
dst.write(corine_10m_masked, 1)
# Update working array
corine_10m = corine_10m_masked
# -----------------------------------------------------------------------------
# SUMMARY
# -----------------------------------------------------------------------------
unique_codes = np.unique(corine_10m[corine_10m > 0])
print(f"\nCORINE Processing Complete:")
print(f" Original CRS: {corine_crs}")
print(f" Unique classes: {len(unique_codes)}")
print(f" Classes present: {sorted(unique_codes)}")
print(f" Final resolution: {GRID_SIZE}m")
print(f" Output path: {corine_masked_path}")
print(saocom_gdf)
CORINE Processing Complete:
Original CRS: IGNF:ETRS89LAEA
Unique classes: 10
Classes present: [np.uint16(112), np.uint16(221), np.uint16(223), np.uint16(231), np.uint16(242), np.uint16(243), np.uint16(311), np.uint16(312), np.uint16(313), np.uint16(331)]
Final resolution: 10m
Output path: results\corine_10m_masked.tif
ID SVET LVET LAT_old LAT LON_old LON HEIGHT \
0 1 221 540 45.472638 45.471953 11.136250 11.130792 97.7
1 2 329 540 45.473973 45.473288 11.146259 11.140801 114.9
2 3 396 540 45.475019 45.474334 11.154119 11.148661 207.6
3 4 399 540 45.475076 45.474391 11.154547 11.149089 215.5
4 5 402 540 45.475128 45.474443 11.154941 11.149483 221.8
... ... ... ... ... ... ... ... ...
66760 66787 699 2500 45.534820 45.534135 11.171112 11.165654 465.8
66761 66788 711 2500 45.535035 45.534350 11.172733 11.167275 493.5
66762 66789 765 2500 45.535593 45.534908 11.176933 11.171475 463.9
66763 66790 927 2500 45.537737 45.537052 11.193099 11.187641 554.3
66764 66791 1006 2500 45.539339 45.538654 11.205214 11.199756 811.3
HEIGHT_WRT_DEM SIGMA_HEIGHT COHER geometry \
0 97.7 2.288012 0.71 POINT (666554.015 5037588.965)
1 114.9 2.207678 0.77 POINT (667332.413 5037758.078)
2 207.6 1.852991 0.90 POINT (667943.665 5037890.687)
3 215.5 1.924608 0.90 POINT (667976.949 5037897.914)
4 221.8 2.133719 0.83 POINT (668007.59 5037904.515)
... ... ... ... ...
66760 465.8 1.976760 0.91 POINT (669092.656 5044569.999)
66761 493.5 1.936943 0.94 POINT (669218.576 5044597.302)
66762 463.9 1.876382 0.89 POINT (669544.828 5044668.159)
66763 554.3 1.962382 0.81 POINT (670800.535 5044940.634)
66764 811.3 2.671757 0.76 POINT (671741.529 5045144.47)
x_utm y_utm tinitaly_height copernicus_height \
0 666554.014609 5.037589e+06 100.003799 100.706978
1 667332.413137 5.037758e+06 118.215897 116.914986
2 667943.665216 5.037891e+06 212.287094 213.647568
3 667976.948898 5.037898e+06 211.314804 218.554688
4 668007.589893 5.037905e+06 217.124496 226.343140
... ... ... ... ...
66760 669092.655867 5.044570e+06 467.319885 470.105194
66761 669218.576178 5.044597e+06 508.733185 505.111084
66762 669544.827517 5.044668e+06 467.017303 467.463135
66763 670800.534953 5.044941e+06 554.237671 552.381348
66764 671741.528756 5.045144e+06 818.630981 822.318176
HEIGHT_RELATIVE HEIGHT_ABSOLUTE_TIN HEIGHT_ABSOLUTE_COP
0 97.7 102.008191 102.4612
1 114.9 119.208191 119.6612
2 207.6 211.908191 212.3612
3 215.5 219.808191 220.2612
4 221.8 226.108191 226.5612
... ... ... ...
66760 465.8 470.108191 470.5612
66761 493.5 497.808191 498.2612
66762 463.9 468.208191 468.6612
66763 554.3 558.608191 559.0612
66764 811.3 815.608191 816.0612
[66765 rows x 19 columns]
SIMPLE SPATIAL OVERLAP VISUALIZATION¶
In [12]:
# =============================================================================
# SIMPLE SPATIAL OVERLAP VISUALIZATION
# =============================================================================
import matplotlib.pyplot as plt
import rasterio
from rasterio.plot import show
fig, ax = plt.subplots(1, 1, figsize=(12, 10), facecolor='white')
ax.set_facecolor('white')
# 1. Plot original TINITALY DEM
with rasterio.open(tinitaly_path) as src:
# Reproject to target CRS for comparison
from rasterio.warp import transform_bounds
# Get DEM bounds in target CRS
dem_bounds_target = transform_bounds(src.crs, TARGET_CRS, *src.bounds)
print(f"TINITALY bounds (original CRS): {src.bounds}")
print(f"TINITALY bounds (UTM 32N): {dem_bounds_target}")
print(f"TINITALY CRS: {src.crs}")
# Read a downsampled version for quick plotting
dem_data = src.read(1, out_shape=(
int(src.height / 10),
int(src.width / 10)
))
# Plot DEM extent as a rectangle
from matplotlib.patches import Rectangle
dem_rect = Rectangle(
(dem_bounds_target[0], dem_bounds_target[1]),
dem_bounds_target[2] - dem_bounds_target[0],
dem_bounds_target[3] - dem_bounds_target[1],
linewidth=3, edgecolor='blue', facecolor='none',
label='TINITALY Extent'
)
ax.add_patch(dem_rect)
# 2. Plot SAOCOM points
saocom_gdf.plot(ax=ax, markersize=1, color='red', alpha=0.5, label='SAOCOM Points')
# 3. Plot study area (convex hull)
hull_gdf.boundary.plot(ax=ax, color='green', linewidth=2, linestyle='--', label='Study Area Hull')
# Labels and formatting
ax.set_xlabel('UTM Easting (m)', fontsize=11)
ax.set_ylabel('UTM Northing (m)', fontsize=11)
ax.set_title('Spatial Coverage: SAOCOM vs TINITALY DEM', fontweight='bold', fontsize=14)
ax.legend(loc='best', fontsize=10)
ax.grid(True, alpha=0.3, color='gray')
# Add text box with extents
info_text = f"""SAOCOM Extent:
X: [{saocom_gdf.geometry.x.min():.0f}, {saocom_gdf.geometry.x.max():.0f}]
Y: [{saocom_gdf.geometry.y.min():.0f}, {saocom_gdf.geometry.y.max():.0f}]
TINITALY Extent (UTM 32N):
X: [{dem_bounds_target[0]:.0f}, {dem_bounds_target[2]:.0f}]
Y: [{dem_bounds_target[1]:.0f}, {dem_bounds_target[3]:.0f}]
"""
ax.text(0.02, 0.98, info_text, transform=ax.transAxes,
fontsize=9, verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='black'))
plt.tight_layout()
plt.show()
# =============================================================================
# CHECK IF EXTENTS OVERLAP
# =============================================================================
print("\n=== OVERLAP CHECK ===")
overlap_x = not (saocom_gdf.geometry.x.max() < dem_bounds_target[0] or
saocom_gdf.geometry.x.min() > dem_bounds_target[2])
overlap_y = not (saocom_gdf.geometry.y.max() < dem_bounds_target[1] or
saocom_gdf.geometry.y.min() > dem_bounds_target[3])
print(f"X-axis overlap: {overlap_x}")
print(f"Y-axis overlap: {overlap_y}")
print(f"Full overlap: {overlap_x and overlap_y}")
if not (overlap_x and overlap_y):
print("\n⚠️ NO OVERLAP DETECTED - SAOCOM data is outside TINITALY coverage!")
print("You need a different TINITALY tile that covers this area.")
TINITALY bounds (original CRS): BoundingBox(left=663160.0, bottom=5035530.0, right=674300.0, top=5046650.0) TINITALY bounds (UTM 32N): (663160.0, 5035530.0, 674300.0, 5046650.0) TINITALY CRS: EPSG:32632
=== OVERLAP CHECK === X-axis overlap: True Y-axis overlap: True Full overlap: True
COMPREHENSIVE REFERENCE DEM COMPARISON VISUALIZATION¶
In [13]:
# =============================================================================
# COMPREHENSIVE REFERENCE DEM COMPARISON VISUALIZATION
# =============================================================================
fig, axes = plt.subplots(4, 2, figsize=(20, 28), facecolor='white')
extent = [xmin_grid, xmax_grid, ymin_grid, ymax_grid]
# Plot 1: TINITALY elevation
ax = axes[0, 0]
ax.set_facecolor('white')
tinitaly_display = np.ma.masked_where(tinitaly_data == NODATA, tinitaly_data)
cmap1 = plt.cm.terrain.copy()
cmap1.set_bad(color='white', alpha=0)
im1 = ax.imshow(tinitaly_display, cmap=cmap1, origin='upper', extent=extent)
hull_gdf.boundary.plot(ax=ax, color='darkred', linewidth=2.5, label='Study Area')
ax.set_title('TINITALY Elevation', fontweight='bold', fontsize=12, color='black')
ax.set_xlabel('UTM Easting (m)', color='black')
ax.set_ylabel('UTM Northing (m)', color='black')
ax.grid(True, color='black', alpha=0.3, linewidth=0.5)
ax.tick_params(colors='black')
cbar1 = plt.colorbar(im1, ax=ax, label='Elevation (m)', shrink=0.8)
cbar1.ax.yaxis.label.set_color('black')
cbar1.ax.tick_params(colors='black')
stats1 = f"""Min: {np.nanmin(tinitaly_display):.1f}m
Max: {np.nanmax(tinitaly_display):.1f}m
Mean: {np.nanmean(tinitaly_display):.1f}m
Std: {np.nanstd(tinitaly_display):.1f}m"""
ax.text(0.02, 0.98, stats1, transform=ax.transAxes, fontsize=9, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='black'))
# Plot 2: Copernicus elevation
ax = axes[0, 1]
ax.set_facecolor('white')
copernicus_display = np.ma.masked_where(copernicus_data == NODATA, copernicus_data)
cmap2 = plt.cm.terrain.copy()
cmap2.set_bad(color='white', alpha=0)
im2 = ax.imshow(copernicus_display, cmap=cmap2, origin='upper', extent=extent)
hull_gdf.boundary.plot(ax=ax, color='darkred', linewidth=2.5, label='Study Area')
ax.set_title('Copernicus Elevation', fontweight='bold', fontsize=12, color='black')
ax.set_xlabel('UTM Easting (m)', color='black')
ax.set_ylabel('UTM Northing (m)', color='black')
ax.grid(True, color='black', alpha=0.3, linewidth=0.5)
ax.tick_params(colors='black')
cbar2 = plt.colorbar(im2, ax=ax, label='Elevation (m)', shrink=0.8)
cbar2.ax.yaxis.label.set_color('black')
cbar2.ax.tick_params(colors='black')
stats2 = f"""Min: {np.nanmin(copernicus_display):.1f}m
Max: {np.nanmax(copernicus_display):.1f}m
Mean: {np.nanmean(copernicus_display):.1f}m
Std: {np.nanstd(copernicus_display):.1f}m"""
ax.text(0.02, 0.98, stats2, transform=ax.transAxes, fontsize=9, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='black'))
# Plot 3: Difference map (full)
ax = axes[1, 0]
ax.set_facecolor('white')
diff_display = np.ma.masked_where(~valid_mask, elevation_diff)
diff_limit = np.percentile(np.abs(valid_diffs), 95)
cmap3 = plt.cm.coolwarm.copy()
cmap3.set_bad(color='white', alpha=0)
im3 = ax.imshow(diff_display, cmap=cmap3, origin='upper', extent=extent,
vmin=-diff_limit, vmax=diff_limit)
hull_gdf.boundary.plot(ax=ax, color='darkred', linewidth=2.5, label='Study Area')
ax.set_title(f'Elevation Difference\n(TINITALY - Copernicus)', fontweight='bold', fontsize=12, color='black')
ax.set_xlabel('UTM Easting (m)', color='black')
ax.set_ylabel('UTM Northing (m)', color='black')
ax.grid(True, color='black', alpha=0.3, linewidth=0.5)
ax.tick_params(colors='black')
cbar3 = plt.colorbar(im3, ax=ax, label='Difference (m)', shrink=0.8)
cbar3.ax.yaxis.label.set_color('black')
cbar3.ax.tick_params(colors='black')
stats3 = f"""Pixels: {valid_pixels:,}
Mean: {ref_metrics['mean_diff']:+.2f}m
RMSE: {ref_metrics['rmse']:.2f}m
NMAD: {ref_metrics['nmad']:.2f}m
MAE: {ref_metrics['mae']:.2f}m
Std: {ref_metrics['std_diff']:.2f}m
Corr: {ref_metrics['correlation']:.3f}
Range: [{ref_metrics['min_diff']:.1f}, {ref_metrics['max_diff']:.1f}]m"""
ax.text(0.02, 0.98, stats3, transform=ax.transAxes, fontsize=8, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='black'))
# Plot 4: Statistics summary
ax = axes[1, 1]
ax.set_facecolor('white')
le68 = np.percentile(np.abs(valid_diffs), 68.27)
le90 = np.percentile(np.abs(valid_diffs), 90)
le95 = np.percentile(np.abs(valid_diffs), 95)
stats_text = f"""REFERENCE DEM COMPARISON
Valid Pixels: {valid_pixels:,}
CRITICAL METRICS:
Mean Error (Bias): {ref_metrics['mean_diff']:+.2f} m
RMSE: {ref_metrics['rmse']:.2f} m
NMAD (robust): {ref_metrics['nmad']:.2f} m
Std Deviation: {ref_metrics['std_diff']:.2f} m
SECONDARY METRICS:
MAE: {ref_metrics['mae']:.2f} m
Correlation: {ref_metrics['correlation']:.4f}
LE68: {le68:.2f} m
LE90: {le90:.2f} m
LE95: {le95:.2f} m
Median: {ref_metrics['median_diff']:+.2f} m
DIRECTIONAL BREAKDOWN:
(Tolerance: ±{equal_tolerance:.2f} m)
TINITALY Higher: {higher_pixels:,} ({pct_higher:.1f}%)
Copernicus Higher: {lower_pixels:,} ({pct_lower:.1f}%)
Roughly Equal: {equal_pixels:,} ({pct_equal:.1f}%)
Range: {ref_metrics['min_diff']:+.1f} to {ref_metrics['max_diff']:+.1f} m
"""
ax.text(0.05, 0.5, stats_text, transform=ax.transAxes,
fontfamily='monospace', fontsize=9, verticalalignment='center', color='black')
ax.axis('off')
ax.set_title('Summary Statistics', fontweight='bold', fontsize=12, color='black')
# Plot 5: Where TINITALY > Copernicus
ax = axes[2, 0]
ax.set_facecolor('white')
cmap5 = plt.cm.YlOrRd.copy()
cmap5.set_bad(color='white', alpha=0)
im5 = ax.imshow(tinitaly_higher_data, cmap=cmap5, origin='upper', extent=extent,
vmin=0, vmax=np.nanmax(tinitaly_higher_data))
hull_gdf.boundary.plot(ax=ax, color='darkred', linewidth=2.5, label='Study Area')
ax.set_title(f'TINITALY > Copernicus', fontweight='bold', fontsize=12, color='black')
ax.set_xlabel('UTM Easting (m)', color='black')
ax.set_ylabel('UTM Northing (m)', color='black')
ax.grid(True, color='black', alpha=0.3, linewidth=0.5)
ax.tick_params(colors='black')
cbar5 = plt.colorbar(im5, ax=ax, label='Difference (m)', shrink=0.8)
cbar5.ax.yaxis.label.set_color('black')
cbar5.ax.tick_params(colors='black')
higher_vals = tinitaly_higher_data[~np.isnan(tinitaly_higher_data)]
stats5 = f"""Pixels: {higher_pixels:,} ({pct_higher:.1f}%)
Mean: {np.mean(higher_vals):.2f}m
Std: {np.std(higher_vals):.2f}m
RMSE: {np.sqrt(np.mean(higher_vals**2)):.2f}m
Max: {np.max(higher_vals):.2f}m"""
ax.text(0.02, 0.98, stats5, transform=ax.transAxes, fontsize=8, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='black'))
# Plot 6: Where TINITALY < Copernicus
ax = axes[2, 1]
ax.set_facecolor('white')
cmap6 = plt.cm.Blues_r.copy()
cmap6.set_bad(color='white', alpha=0)
im6 = ax.imshow(tinitaly_lower_data, cmap=cmap6, origin='upper', extent=extent,
vmin=np.nanmin(tinitaly_lower_data), vmax=0)
hull_gdf.boundary.plot(ax=ax, color='darkred', linewidth=2.5, label='Study Area')
ax.set_title(f'Copernicus > TINITALY', fontweight='bold', fontsize=12, color='black')
ax.set_xlabel('UTM Easting (m)', color='black')
ax.set_ylabel('UTM Northing (m)', color='black')
ax.grid(True, color='black', alpha=0.3, linewidth=0.5)
ax.tick_params(colors='black')
cbar6 = plt.colorbar(im6, ax=ax, label='Difference (m)', shrink=0.8)
cbar6.ax.yaxis.label.set_color('black')
cbar6.ax.tick_params(colors='black')
lower_vals = tinitaly_lower_data[~np.isnan(tinitaly_lower_data)]
stats6 = f"""Pixels: {lower_pixels:,} ({pct_lower:.1f}%)
Mean: {np.mean(lower_vals):.2f}m
Std: {np.std(lower_vals):.2f}m
RMSE: {np.sqrt(np.mean(lower_vals**2)):.2f}m
Min: {np.min(lower_vals):.2f}m"""
ax.text(0.02, 0.98, stats6, transform=ax.transAxes, fontsize=8, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='black'))
# Plot 7: Where roughly equal
ax = axes[3, 0]
ax.set_facecolor('white')
cmap7 = plt.cm.Greens.copy()
cmap7.set_bad(color='white', alpha=0)
im7 = ax.imshow(roughly_equal_data, cmap=cmap7, origin='upper', extent=extent,
vmin=-equal_tolerance, vmax=equal_tolerance)
hull_gdf.boundary.plot(ax=ax, color='darkred', linewidth=2.5, label='Study Area')
ax.set_title(f'Roughly Equal (±{equal_tolerance:.2f}m)', fontweight='bold', fontsize=12, color='black')
ax.set_xlabel('UTM Easting (m)', color='black')
ax.set_ylabel('UTM Northing (m)', color='black')
ax.grid(True, color='black', alpha=0.3, linewidth=0.5)
ax.tick_params(colors='black')
cbar7 = plt.colorbar(im7, ax=ax, label='Difference (m)', shrink=0.8)
cbar7.ax.yaxis.label.set_color('black')
cbar7.ax.tick_params(colors='black')
equal_vals = roughly_equal_data[~np.isnan(roughly_equal_data)]
stats7 = f"""Pixels: {equal_pixels:,} ({pct_equal:.1f}%)
Mean: {np.mean(equal_vals):.2f}m
Std: {np.std(equal_vals):.2f}m
RMSE: {np.sqrt(np.mean(equal_vals**2)):.2f}m
MAE: {np.mean(np.abs(equal_vals)):.2f}m"""
ax.text(0.02, 0.98, stats7, transform=ax.transAxes, fontsize=8, verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='black'))
# # Plot 8: Histogram of differences
# ax = axes[3, 1]
# ax.set_facecolor('white')
# ax.hist(valid_diffs, bins=100, alpha=0.7, color='steelblue', edgecolor='black')
# ax.set_xlim(valid_diffs.min() * 1.05, valid_diffs.max() * 1.05)
# ax.axvline(0, color='red', linestyle='--', linewidth=2, label='Zero')
# ax.axvline(ref_metrics['mean_diff'], color='green', linestyle='-', linewidth=2,
# label=f'Mean: {ref_metrics["mean_diff"]:+.2f}m')
# ax.axvline(equal_tolerance, color='orange', linestyle='--', linewidth=1.5,
# label=f'±{equal_tolerance:.2f}m')
# ax.axvline(-equal_tolerance, color='orange', linestyle='--', linewidth=1.5)
# ax.set_xlabel('Elevation Difference (m)', color='black')
# ax.set_ylabel('Frequency', color='black')
# ax.set_title('Difference Distribution', fontweight='bold', fontsize=12, color='black')
# ax.tick_params(colors='black')
# ax.legend(loc='upper right', fontsize=9)
# ax.grid(True, alpha=0.3, color='black', linewidth=0.5)
# for spine in ax.spines.values():
# spine.set_edgecolor('black')
# ax.set_yscale('log')
# plt.tight_layout()
# plt.show()
# Plot 8: Histogram of differences
ax = axes[3, 1]
ax.set_facecolor('white')
# Create histogram
n, bins, patches = ax.hist(valid_diffs, bins=50, alpha=0.75,
color='steelblue', edgecolor='black')
# Add reference lines
ax.axvline(0, color='red', linestyle='--', linewidth=2, label='Zero')
ax.axvline(ref_metrics['mean_diff'], color='green', linestyle='-', linewidth=2,
label=f'Mean: {ref_metrics["mean_diff"]:+.2f}m')
ax.axvline(equal_tolerance, color='orange', linestyle='--', linewidth=1.5,
label=f'±NMAD: {equal_tolerance:.2f}m')
ax.axvline(-equal_tolerance, color='orange', linestyle='--', linewidth=1.5)
# --- ADD STATISTICS BOX ---
# Calculate the required statistics directly from the data
stats_text = (f"Mean = {valid_diffs.mean():+.2f} m\n"
f"Std Dev = {valid_diffs.std():.2f} m\n"
f"Min = {valid_diffs.min():.2f} m\n"
f"Max = {valid_diffs.max():.2f} m\n"
f"RMSE = {ref_metrics['rmse']:.2f} m\n"
f"NMAD = {ref_metrics['nmad']:.2f} m")
# Place text box in the top-right corner
ax.text(0.97, 0.97, stats_text, transform=ax.transAxes, fontsize=10,
verticalalignment='top', horizontalalignment='right',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8, edgecolor='black'))
# --- SET AXIS LIMITS TO DATA EXTENT ---
x_min = float(valid_diffs.min())
x_max = float(valid_diffs.max())
x_padding = (x_max - x_min) * 0.05 # Increased padding slightly
ax.set_xlim(x_min - x_padding, x_max + x_padding)
# --- LABELS AND STYLING ---
ax.set_xlabel('Elevation Difference (m)', color='black', fontsize=11)
ax.set_ylabel('Frequency', color='black', fontsize=11)
ax.set_title('Difference Distribution', fontweight='bold', fontsize=12, color='black')
ax.tick_params(colors='black')
ax.set_yscale('log')
ax.legend(loc='upper left', fontsize=9)
ax.grid(True, alpha=0.3, color='black', linewidth=0.5)
for spine in ax.spines.values():
spine.set_edgecolor('black')
# =============================================================================
# 1. PREPARE DIFFERENCE DATA
# =============================================================================
# Calculate differences using calibrated SAOCOM heights
saocom_gdf['diff_tinitaly'] = saocom_gdf['HEIGHT_ABSOLUTE_TIN'] - saocom_gdf['tinitaly_height']
saocom_gdf['diff_copernicus'] = saocom_gdf['HEIGHT_ABSOLUTE_COP'] - saocom_gdf['copernicus_height']
# Create coherence bins if not already present
if 'coherence_bin' not in saocom_gdf.columns:
coherence_bins = [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
coherence_labels = [f"{coherence_bins[i]:.1f}-{coherence_bins[i+1]:.1f}"
for i in range(len(coherence_bins)-1)]
saocom_gdf['coherence_bin'] = pd.cut(saocom_gdf['COHER'],
bins=coherence_bins,
labels=coherence_labels,
include_lowest=True)
# =============================================================================
# 2. BASIC VIOLIN PLOTS - ALL REFERENCE COMPARISONS
# =============================================================================
fig, ax = plt.subplots(1, 1, figsize=(12, 8), facecolor='white')
# Prepare data
plot_data = pd.DataFrame({
'SAOCOM - TINITALY': saocom_gdf['diff_tinitaly'],
'SAOCOM - Copernicus': saocom_gdf['diff_copernicus']
})
# Create violin plot
parts = ax.violinplot([plot_data['SAOCOM - TINITALY'].dropna(),
plot_data['SAOCOM - Copernicus'].dropna()],
positions=[1, 2],
showmeans=True,
showmedians=True,
showextrema=True)
padding = (x_max - x_min) * 0.05
plt.xlim(x_min - padding, x_max + padding)
# Color the violins
colors = ['#4472C4', '#ED7D31']
for pc, color in zip(parts['bodies'], colors):
pc.set_facecolor(color)
pc.set_alpha(0.7)
pc.set_edgecolor('black')
# Style other elements
for partname in ('cbars', 'cmins', 'cmaxes', 'cmedians', 'cmeans'):
if partname in parts:
parts[partname].set_edgecolor('black')
parts[partname].set_linewidth(1.5)
ax.axhline(y=0, color='red', linestyle='--', linewidth=1.5, alpha=0.7, label='Zero')
ax.set_xticks([1, 2])
ax.set_xticklabels(['SAOCOM -\nTINITALY', 'SAOCOM -\nCopernicus'], fontsize=11)
ax.set_ylabel('Elevation Difference (m)', fontsize=12)
ax.set_title('Distribution of Elevation Differences', fontweight='bold', fontsize=14)
ax.grid(axis='y', linestyle='--', alpha=0.3)
ax.legend()
# Add statistics
for i, col in enumerate(['SAOCOM - TINITALY', 'SAOCOM - Copernicus'], 1):
data = plot_data[col].dropna()
stats_text = f"n={len(data):,}\nμ={data.mean():.2f}m\nσ={data.std():.2f}m"
ax.text(i, data.max()*0.9, stats_text, ha='center', fontsize=9,
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
plt.tight_layout()
plt.show()
# =============================================================================
# 3. VIOLIN PLOTS BY COHERENCE BINS
# =============================================================================
fig, axes = plt.subplots(1, 2, figsize=(16, 7), facecolor='white')
for idx, (col, title) in enumerate([('diff_tinitaly', 'SAOCOM - TINITALY'),
('diff_copernicus', 'SAOCOM - Copernicus')]):
ax = axes[idx]
# Filter data with valid coherence bins and differences
plot_df = saocom_gdf[saocom_gdf['coherence_bin'].notna() &
saocom_gdf[col].notna()].copy()
if len(plot_df) > 0:
sns.violinplot(x='coherence_bin', y=col, data=plot_df,
inner='quartile', palette='viridis', ax=ax)
y_min = plot_df[col].min()
y_max = plot_df[col].max()
padding = (y_max - y_min) * 0.05
ax.set_ylim(y_min - padding, y_max + padding)
ax.axhline(y=0, color='red', linestyle='--', linewidth=1.5, alpha=0.7)
ax.set_xlabel('Coherence Bin', fontsize=11)
ax.set_ylabel('Difference (m)', fontsize=11)
ax.set_title(f'{title} by Coherence', fontweight='bold', fontsize=12)
ax.grid(axis='y', linestyle='--', alpha=0.3)
# Add sample counts
bin_counts = plot_df['coherence_bin'].value_counts().sort_index()
labels = [f"{label}\n(n={bin_counts.get(label, 0):,})"
for label in coherence_labels]
ax.set_xticklabels(labels, rotation=45, ha='right', fontsize=9)
plt.tight_layout()
plt.show()
# =============================================================================
# 4. SUMMARY STATISTICS TABLE
# =============================================================================
summary_stats = []
for comparison in ['diff_tinitaly', 'diff_copernicus']:
data = saocom_gdf[comparison].dropna()
name = 'SAOCOM - TINITALY' if 'tinitaly' in comparison else 'SAOCOM - Copernicus'
summary_stats.append({
'Comparison': name,
'N Points': f"{len(data):,}",
'Mean': f"{data.mean():+.2f} m",
'Median': f"{data.median():+.2f} m",
'Std Dev': f"{data.std():.2f} m",
'RMSE': f"{np.sqrt((data**2).mean()):.2f} m",
'Range': f"[{data.min():.1f}, {data.max():.1f}] m"
})
summary_df = pd.DataFrame(summary_stats)
print("\n" + "="*80)
print("VIOLIN PLOT SUMMARY STATISTICS")
print("="*80)
print(summary_df.to_string(index=False))
print("="*80)
================================================================================
VIOLIN PLOT SUMMARY STATISTICS
================================================================================
Comparison N Points Mean Median Std Dev RMSE Range
SAOCOM - TINITALY 66,690 +0.40 m -0.08 m 14.74 m 14.75 m [-837.1, 643.6] m
SAOCOM - Copernicus 66,765 -0.49 m -0.18 m 14.47 m 14.48 m [-837.5, 639.7] m
================================================================================
SAOCOM HEIGHT RESIDUAL OUTLIER DETECTION AND VISUALIZATION¶
In [14]:
import numpy as np
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
def score_outliers_isolation_forest(
gdf: gpd.GeoDataFrame,
residual_col: str,
**kwargs
) -> gpd.GeoDataFrame:
"""
Assigns an anomaly score to each point using the Isolation Forest algorithm.
This function is unchanged and remains the core of the detection process.
Args:
gdf (gpd.GeoDataFrame): The input GeoDataFrame.
residual_col (str): The name of the column containing the height residuals.
**kwargs: Additional keyword arguments to pass to the IsolationForest model.
Returns:
gpd.GeoDataFrame: The original GeoDataFrame with a new 'outlier_score' column.
"""
if gdf.empty or residual_col not in gdf.columns:
print("Warning: Input GeoDataFrame is empty or residual column not found.")
gdf['outlier_score'] = np.nan
return gdf
points_3d = np.c_[
gdf.geometry.x,
gdf.geometry.y,
gdf[residual_col].fillna(0)
]
scaler = StandardScaler()
points_3d_scaled = scaler.fit_transform(points_3d)
model_params = {'n_estimators': 100, 'contamination': 'auto', 'random_state': 42, 'n_jobs': -1}
model_params.update(kwargs)
print("Fitting Isolation Forest model...")
model = IsolationForest(**model_params)
model.fit(points_3d_scaled)
gdf_scored = gdf.copy()
gdf_scored['outlier_score'] = model.decision_function(points_3d_scaled)
print("Scoring complete. Added 'outlier_score' column.")
return gdf_scored
def filter_by_score_iqr(
gdf_scored: gpd.GeoDataFrame,
iqr_multiplier: float = 1
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""
Filters a scored GeoDataFrame using a statistically-driven threshold.
This function calculates the outlier threshold by applying the IQR method
to the 'outlier_score' column itself. This removes the need for an
arbitrary percentage cutoff.
Args:
gdf_scored (gpd.GeoDataFrame): GeoDataFrame with an 'outlier_score' column.
iqr_multiplier (float): The multiplier for the IQR to define the cutoff.
A standard value is 1.5.
Returns:
tuple: A tuple containing the cleaned GeoDataFrame and the outlier GeoDataFrame.
"""
if 'outlier_score' not in gdf_scored.columns:
raise ValueError("Input GeoDataFrame must have an 'outlier_score' column.")
scores = gdf_scored['outlier_score']
q1 = scores.quantile(0.25)
q3 = scores.quantile(0.75)
iqr = q3 - q1
# The threshold is statistically determined from the distribution of scores
score_threshold = q1 - (iqr_multiplier * iqr)
print(score_threshold)
is_outlier_mask = scores < score_threshold
outliers = gdf_scored[is_outlier_mask].copy()
gdf_cleaned = gdf_scored[~is_outlier_mask].copy()
total_points, n_outliers = len(gdf_scored), len(outliers)
pct_outliers = (n_outliers / total_points) * 100 if total_points > 0 else 0
print("\n" + "=" * 60)
print(f"Statistically-Driven Filtering (IQR on Anomaly Scores)")
print("-" * 60)
print(f"Score Q1: {q1:.4f}, Q3: {q3:.4f}, IQR: {iqr:.4f}")
print(f"IQR Multiplier: {iqr_multiplier}")
print(f"Calculated Score Threshold: < {score_threshold:.4f}")
print("-" * 60)
print(f"Total Points: {total_points:,}")
print(f"Outliers Removed: {n_outliers:,} ({pct_outliers:.2f}%)")
print(f"Remaining Points: {len(gdf_cleaned):,}")
print("=" * 60)
return gdf_cleaned, outliers
def visualize_outlier_results(
gdf_original: gpd.GeoDataFrame,
gdf_cleaned: gpd.GeoDataFrame,
outliers: gpd.GeoDataFrame,
residual_col: str
):
"""
Creates a two-panel plot to visualize the outlier removal results.
(This function is unchanged).
"""
fig, axes = plt.subplots(1, 2, figsize=(20, 9), facecolor='white', gridspec_kw={'width_ratios': [1.2, 1]})
# --- Panel 1: Spatial Distribution ---
ax1 = axes[0]
vmin, vmax = np.nanpercentile(gdf_cleaned[residual_col], [2, 98])
scatter = ax1.scatter(
gdf_cleaned.geometry.x, gdf_cleaned.geometry.y,
c=gdf_cleaned[residual_col], cmap='RdBu_r', s=5,
vmin=vmin, vmax=vmax, alpha=0.8, label='Cleaned Data'
)
plt.colorbar(scatter, ax=ax1, label=f'Residual ({residual_col}) (m)', shrink=0.7)
if not outliers.empty:
outliers.plot(ax=ax1, markersize=25, color='yellow',
edgecolors='black', linewidth=0.8,
label=f'Outliers (n={len(outliers):,})', zorder=5)
ax1.set_title('Spatial Distribution of Cleaned Data and Outliers', fontweight='bold')
ax1.set_xlabel('UTM Easting (m)')
ax1.set_ylabel('UTM Northing (m)')
ax1.legend()
ax1.grid(True, linestyle='--', alpha=0.4)
ax1.tick_params(axis='x', rotation=45)
ax1.set_aspect('equal', adjustable='box')
# --- Panel 2: Residual Distribution Histogram ---
ax2 = axes[1]
original_residuals = gdf_original[residual_col].dropna()
cleaned_residuals = gdf_cleaned[residual_col].dropna()
ax2.hist(original_residuals, bins=100, alpha=0.5, label=f'Before (n={len(original_residuals):,})', color='gray')
ax2.hist(cleaned_residuals, bins=50, alpha=1, label=f'After (n={len(cleaned_residuals):,})', color='#2E86AB')
ax2.set_title('Residual Distribution Before and After Cleaning', fontweight='bold')
ax2.set_xlabel(f'Residual ({residual_col}) (m)')
ax2.set_ylabel('Frequency (Log Scale)')
ax2.set_yscale('log')
ax2.legend()
ax2.grid(True, linestyle='--', alpha=0.4)
plt.tight_layout()
plt.show()
# # =============================================================================
# # EXAMPLE USAGE
# # =============================================================================
# if __name__ == '__main__':
# # 1. Create a sample GeoDataFrame for demonstration
# print("Creating sample GeoDataFrame for demonstration...")
# np.random.seed(42)
# clean_points = np.random.rand(1000, 2) * 1000
# clean_residuals = np.random.normal(0, 2, 1000)
# outlier_points = np.array([[500, 500], [100, 900], [900, 100], [10, 10]])
# outlier_residuals = np.array([80, -65, 95, 5])
#
# all_coords = np.vstack([clean_points, outlier_points])
# all_residuals = np.concatenate([clean_residuals, outlier_residuals])
# sample_gdf = gpd.GeoDataFrame(
# {'diff_tinitaly': all_residuals},
# geometry=[Point(x, y) for x, y in all_coords]
# )
# --- Step 1: Score all the points ---
print(len(saocom_gdf))
saocom_gdf_scored = score_outliers_isolation_forest(saocom_gdf, 'diff_tinitaly')
# --- Step 2: Filter using the STATISTICAL method ---
# No more guessing a percentage! The function calculates the cutoff itself.
saocom_gdf_cleaned, saocom_outliers = filter_by_score_iqr(saocom_gdf_scored)
# --- Step 3: Visualize the results ---
visualize_outlier_results(saocom_gdf, saocom_gdf_cleaned, saocom_outliers, 'diff_tinitaly')
saocom_gdf = saocom_gdf_cleaned
66765 Fitting Isolation Forest model... Scoring complete. Added 'outlier_score' column. -0.07015801537863658 ============================================================ Statistically-Driven Filtering (IQR on Anomaly Scores) ------------------------------------------------------------ Score Q1: 0.0003, Q3: 0.0707, IQR: 0.0704 IQR Multiplier: 1 Calculated Score Threshold: < -0.0702 ------------------------------------------------------------ Total Points: 66,765 Outliers Removed: 2,710 (4.06%) Remaining Points: 64,055 ============================================================
1. SPATIAL SAMPLE CORINE LAND COVER AT SAOCOM POINTS¶
In [15]:
# =============================================================================
# PRE-REQUISITE: CALCULATE HEIGHT RESIDUALS
# (This step was missing/unexecuted from the previous cell sequence)
# =============================================================================
# Residual = Calibrated SAOCOM Height - Reference DEM Height
saocom_gdf['diff_tinitaly'] = saocom_gdf['HEIGHT_ABSOLUTE_TIN'] - saocom_gdf['tinitaly_height']
# saocom_gdf['diff_copernicus'] = saocom_gdf['HEIGHT_ABSOLUTE_COP'] - saocom_gdf['copernicus_height'] # Not needed for this table
# =============================================================================
# 1. SPATIAL SAMPLE CORINE LAND COVER AT SAOCOM POINTS
# =============================================================================
# The corine_10m raster is already loaded, reprojected, and masked (Cell 202).
corine_codes = []
# Assuming the global variables target_transform, grid_height, grid_width, corine_10m, and NODATA are defined in previous cells
for idx, row in saocom_gdf.iterrows():
# Use the same transform and grid dimensions as defined in Cell 200
row_idx, col_idx = rowcol(target_transform, row.geometry.x, row.geometry.y)
if 0 <= row_idx < grid_height and 0 <= col_idx < grid_width:
code = corine_10m[row_idx, col_idx]
# Skip NoData/Masked value (255)
corine_codes.append(code if code != 255 else 0)
else:
corine_codes.append(0) # 0 represents NoData/outside study area
saocom_gdf['corine_code'] = corine_codes
# Filter out points outside the valid CORINE area (where code is 0)
saocom_lc_analysis = saocom_gdf[saocom_gdf['corine_code'] != 0].copy()
# =============================================================================
# 2. CALCULATE ROBUST STATISTICS BY LAND COVER CLASS
# =============================================================================
def nmad(data):
"""Normalized Median Absolute Deviation (robust measure of spread)"""
return 1.4826 * np.median(np.abs(data - np.median(data)))
# Calculate stats for the recommended residual: SAOCOM (calibrated to TINITALY) - TINITALY DEM
lc_height_stats = saocom_lc_analysis.groupby('corine_code')['diff_tinitaly'].agg([
'count',
'median',
'mean',
'std',
nmad
]).reset_index()
# Rename columns for clarity
lc_height_stats.rename(columns={
'count': 'N_Points',
'median': 'Median_Diff_m',
'mean': 'Mean_Diff_m',
'std': 'Std_Dev_m',
'nmad': 'NMAD_m'
}, inplace=True)
# Add land cover label for interpretability
lc_height_stats['LC_Label'] = lc_height_stats['corine_code'].map(CORINE_CLASSES)
# Reorder and filter for classes with enough samples (N > 50)
MIN_SAMPLES = 50
lc_height_stats_filtered = lc_height_stats[lc_height_stats['N_Points'] >= MIN_SAMPLES]
lc_height_stats_filtered = lc_height_stats_filtered.sort_values('LC_Label', ascending=True)
# =============================================================================
# 3. DISPLAY RESULTS
# =============================================================================
print("\n" + "="*100)
print(f"HEIGHT RESIDUAL STATISTICS by CORINE Land Cover (N > {MIN_SAMPLES})")
print("(Residual = Calibrated SAOCOM Height - TINITALY Reference DEM)")
print("="*100)
display_cols = ['corine_code', 'LC_Label', 'N_Points', 'Median_Diff_m', 'NMAD_m', 'Mean_Diff_m', 'Std_Dev_m']
# Print the filtered, robustly sorted table
print(lc_height_stats_filtered[display_cols].to_string(
index=False,
float_format=lambda x: f'{x:+.2f}' if 'Diff' in lc_height_stats_filtered.columns.tolist() else f'{x:.2f}', # Generic float formatting
formatters={'N_Points': '{:,}'.format,
'Median_Diff_m': '{:+.2f} m'.format,
'NMAD_m': '{:.2f} m'.format,
'Mean_Diff_m': '{:+.2f} m'.format,
'Std_Dev_m': '{:.2f} m'.format}
))
print("="*100)
# Store the filtered results for later use in plotting/reporting
lc_height_stats_final = lc_height_stats_filtered.copy()
# =============================================================================
# 1. SETUP AND DEFINITIONS (from previous cells)
# =============================================================================
# Global variables assumed defined:
# corine_10m (masked CLC raster), study_area_mask (valid area inside hull),
# saocom_coverage (boolean array: True where SAOCOM data exists),
# void_mask (study_area_mask & ~saocom_coverage), GRID_SIZE (10m)
# CORINE_CLASSES (LC code lookup)
# Recalculate global void stats for context
n_total_cells = np.sum(study_area_mask)
n_void_cells = np.sum(void_mask)
void_percentage_global = 100 * n_void_cells / n_total_cells if n_total_cells > 0 else 0
# Get unique, valid CORINE codes within the study area
unique_lc_codes = np.unique(corine_10m[study_area_mask])
unique_lc_codes = unique_lc_codes[unique_lc_codes != 0] # Filter out 0/NoData
# =============================================================================
# 2. VOID ANALYSIS BY LAND COVER CLASS
# =============================================================================
void_stats_by_lc = []
cell_area_km2 = (GRID_SIZE / 1000.0) ** 2 # 0.0001 km^2
for lc_code in unique_lc_codes:
# Mask for this land cover class within the study area
lc_mask = study_area_mask & (corine_10m == lc_code)
# Total cells of this land cover
total_lc_cells = np.sum(lc_mask)
if total_lc_cells == 0:
continue
# Void cells within this land cover
void_lc_cells = np.sum(lc_mask & void_mask)
# METRIC 1: What % of this land cover is void? (Key Metric for coverage performance)
pct_of_lc_that_is_void = 100 * void_lc_cells / total_lc_cells
# METRIC 2: What % of total voids is this land cover? (Key Metric for contribution)
pct_of_total_voids = 100 * void_lc_cells / n_void_cells if n_void_cells > 0 else 0
void_stats_by_lc.append({
'corine_code': lc_code,
'label': CORINE_CLASSES.get(lc_code, f'Unknown_{lc_code}'),
'total_cells': total_lc_cells,
'void_cells': void_lc_cells,
'Area_km2': total_lc_cells * cell_area_km2,
'Pct_LC_is_Void': pct_of_lc_that_is_void,
'Pct_of_Total_Voids': pct_of_total_voids,
})
# Create DataFrame
void_stats_df = pd.DataFrame(void_stats_by_lc)
# =============================================================================
# 3. DISPLAY RESULTS
# =============================================================================
# Filter for significant land cover classes (e.g., > 1 km2 area)
MIN_AREA_KM2 = 1.0
void_stats_filtered = void_stats_df[void_stats_df['Area_km2'] >= MIN_AREA_KM2].copy()
# Sort by the primary metric: Percentage of the LC class that is void
void_stats_filtered = void_stats_filtered.sort_values('Pct_LC_is_Void', ascending=False)
print("\n" + "="*120)
print(f"VOID ANALYSIS by CORINE Land Cover (Area > {MIN_AREA_KM2:.1f} km²)")
print(f"Overall Void Percentage (Study Area): {void_percentage_global:.2f}%")
print("="*120)
display_cols = ['corine_code', 'label', 'Area_km2', 'void_cells', 'Pct_LC_is_Void', 'Pct_of_Total_Voids']
# Print the filtered table
print(void_stats_filtered[display_cols].to_string(
index=False,
float_format=lambda x: f'{x:.2f}',
formatters={'Area_km2': '{:.1f} km²'.format,
'void_cells': '{:,}'.format,
'Pct_LC_is_Void': '{:.2f} %'.format,
'Pct_of_Total_Voids': '{:.2f} %'.format}
))
print("="*120)
# Store the filtered results for later use in plotting/reporting
lc_void_stats_final = void_stats_filtered.copy()
====================================================================================================
HEIGHT RESIDUAL STATISTICS by CORINE Land Cover (N > 50)
(Residual = Calibrated SAOCOM Height - TINITALY Reference DEM)
====================================================================================================
corine_code LC_Label N_Points Median_Diff_m NMAD_m Mean_Diff_m Std_Dev_m
243 Agriculture/natural vegetation mix 9,796 -0.98 m NaN -0.70 m 4.68 m
331 Beaches, dunes, sands 1,804 -1.03 m 1.34 m -1.13 m 1.67 m
311 Broad-leaved forest 12,631 +1.98 m NaN +2.30 m 5.85 m
242 Complex cultivation patterns 2,807 -0.89 m 1.35 m -0.62 m 2.38 m
312 Coniferous forest 978 +2.79 m 6.22 m +2.70 m 6.53 m
112 Discontinuous urban fabric 5,107 +0.55 m 1.59 m +0.83 m 1.99 m
313 Mixed forest 263 +2.09 m 5.40 m +2.05 m 5.38 m
223 Olive groves 1,552 -3.41 m 2.72 m -2.91 m 3.61 m
231 Pastures 5,872 -0.82 m NaN -0.49 m 4.51 m
221 Vineyards 23,142 -0.12 m 1.92 m +0.29 m 3.34 m
====================================================================================================
========================================================================================================================
VOID ANALYSIS by CORINE Land Cover (Area > 1.0 km²)
Overall Void Percentage (Study Area): 87.04%
========================================================================================================================
corine_code label Area_km2 void_cells Pct_LC_is_Void Pct_of_Total_Voids
311 Broad-leaved forest 14.6 km² 132,547 90.84 % 30.74 %
243 Agriculture/natural vegetation mix 9.6 km² 85,772 89.33 % 19.89 %
223 Olive groves 1.3 km² 11,300 87.29 % 2.62 %
231 Pastures 3.9 km² 33,421 85.25 % 7.75 %
221 Vineyards 14.2 km² 119,813 84.16 % 27.79 %
242 Complex cultivation patterns 1.5 km² 12,342 81.95 % 2.86 %
112 Discontinuous urban fabric 2.2 km² 17,299 77.93 % 4.01 %
========================================================================================================================
1. PREPARE DATA FOR PLOTTING¶
In [16]:
# =============================================================================
# 1. PREPARE DATA FOR PLOTTING
# =============================================================================
# Use the full saocom_lc_analysis DataFrame which contains both the residuals
# ('diff_tinitaly') and the sampled land cover codes ('corine_code').
# Define major land cover groups for better visualization (CLC Level 1)
def get_clc_level1(code):
"""Maps CLC Level 3 code to Level 1 category"""
if 100 <= code < 200: return '1. Artificial Surfaces'
if 200 <= code < 300: return '2. Agricultural Areas'
if 300 <= code < 400: return '3. Forest & Semi-Natural Areas'
if 400 <= code < 500: return '4. Wetlands'
if 500 <= code < 600: return '5. Water Bodies'
return 'Other'
# Add Level 1 categories to the analysis DataFrame
saocom_lc_analysis['LC_Level_1'] = saocom_lc_analysis['corine_code'].apply(get_clc_level1)
saocom_lc_analysis['LC_Label'] = saocom_lc_analysis['corine_code'].map(CORINE_CLASSES)
# Filter for the most common Level 3 classes (using the N_Points filter from Step 1)
common_codes = lc_height_stats_final['corine_code'].unique()
plot_df_L3 = saocom_lc_analysis[saocom_lc_analysis['corine_code'].isin(common_codes)].copy()
# Filter extreme outliers for better plot scaling (e.g., 99th percentile)
q_low = plot_df_L3['diff_tinitaly'].quantile(0.005)
q_high = plot_df_L3['diff_tinitaly'].quantile(0.995)
plot_df_L3_filtered = plot_df_L3[(plot_df_L3['diff_tinitaly'] >= q_low) &
(plot_df_L3['diff_tinitaly'] <= q_high)]
# Sort the categories by the NMAD metric (best to worst performance)
nmad_order = lc_height_stats_final.sort_values('LC_Label', ascending=False)['LC_Label'].tolist()
plot_df_L3_filtered['LC_Label'] = pd.Categorical(
plot_df_L3_filtered['LC_Label'],
categories=nmad_order,
ordered=True
)
q_low = plot_df_L3['diff_copernicus'].quantile(0.005)
q_high = plot_df_L3['diff_copernicus'].quantile(0.995)
plot_df_cop_filtered = plot_df_L3[(plot_df_L3['diff_copernicus'] >= q_low) &
(plot_df_L3['diff_copernicus'] <= q_high)]
# Sort the categories by the NMAD metric (best to worst performance)
nmad_order = lc_height_stats_final.sort_values('LC_Label', ascending=False)['LC_Label'].tolist()
plot_df_cop_filtered['LC_Label'] = pd.Categorical(
plot_df_cop_filtered['LC_Label'],
categories=nmad_order,
ordered=True
)
SENTINEL-2 RGB PREPARATION¶
In [17]:
# =============================================================================
# SENTINEL-2 RGB PREPARATION
# =============================================================================
# File discovery
sentinel_files = list((DATA_DIR / "sentinel_data").glob("*.tif"))
if not sentinel_files:
raise FileNotFoundError("No Sentinel files found in sentinel_data directory")
# Load Sentinel bands (assuming separate R, G, B files or multi-band)
with rasterio.open(sentinel_files[0]) as src:
sentinel_count = src.count
if sentinel_count >= 3:
# Multi-band file - read RGB bands
sentinel_r = src.read(1) # Band 1 (Red)
sentinel_g = src.read(2) # Band 2 (Green)
sentinel_b = src.read(3) # Band 3 (Blue)
sentinel_transform_orig = src.transform
sentinel_crs = src.crs
else:
# Single band files - need to find R, G, B separately
r_file = next((f for f in sentinel_files if 'B04' in f.name or 'red' in f.name.lower()), None)
g_file = next((f for f in sentinel_files if 'B03' in f.name or 'green' in f.name.lower()), None)
b_file = next((f for f in sentinel_files if 'B02' in f.name or 'blue' in f.name.lower()), None)
if not all([r_file, g_file, b_file]):
raise FileNotFoundError("Could not find RGB bands in Sentinel files")
with rasterio.open(r_file) as r_src:
sentinel_r = r_src.read(1)
sentinel_transform_orig = r_src.transform
sentinel_crs = r_src.crs
with rasterio.open(g_file) as g_src:
sentinel_g = g_src.read(1)
with rasterio.open(b_file) as b_src:
sentinel_b = b_src.read(1)
# Resample each band to 10m grid
sentinel_r_10m = np.zeros((grid_height, grid_width), dtype=np.float32)
sentinel_g_10m = np.zeros((grid_height, grid_width), dtype=np.float32)
sentinel_b_10m = np.zeros((grid_height, grid_width), dtype=np.float32)
for band_src, band_dst in [(sentinel_r, sentinel_r_10m),
(sentinel_g, sentinel_g_10m),
(sentinel_b, sentinel_b_10m)]:
reproject(
source=band_src,
destination=band_dst,
src_transform=sentinel_transform_orig,
src_crs=sentinel_crs,
dst_transform=target_transform,
dst_crs=TARGET_CRS,
resampling=Resampling.bilinear
)
# Stack into RGB array
sentinel_rgb = np.stack([sentinel_r_10m, sentinel_g_10m, sentinel_b_10m], axis=2)
# Mask to study area
sentinel_rgb[~hull_mask] = 0
# Normalize to 0-1 for display (using 2-98 percentile stretch for contrast)
sentinel_rgb_norm = np.zeros_like(sentinel_rgb, dtype=np.float32)
for i in range(3):
band = sentinel_rgb[:, :, i]
valid_pixels = band[hull_mask]
if len(valid_pixels) > 0:
p2, p98 = np.percentile(valid_pixels[valid_pixels > 0], [2, 98])
band_norm = np.clip((band - p2) / (p98 - p2), 0, 1)
sentinel_rgb_norm[:, :, i] = band_norm
print(f"\nSentinel-2 RGB Prepared:")
print(f" Shape: {sentinel_rgb_norm.shape}")
print(f" Resolution: {GRID_SIZE}m")
print(f" Value range: [{sentinel_rgb_norm.min():.3f}, {sentinel_rgb_norm.max():.3f}]")
Sentinel-2 RGB Prepared: Shape: (777, 929, 3) Resolution: 10m Value range: [0.000, 1.000]
2. GENERATE VIOLIN PLOT (Level 3 - Detailed Performance)¶
In [18]:
# =============================================================================
# 1. GENERATE STATS WITHIN THE PLOT SCRIPT
# =============================================================================
print("Generating statistics just-in-time for the plot...")
def calculate_nmad(series):
"""Calculates the Normalized Median Absolute Deviation (NMAD)."""
return (series - series.median()).abs().median() * 1.4826
# Create a new, temporary stats DataFrame by grouping the plotting data
stats_for_plot = plot_df_L3_filtered.groupby('LC_Label')['diff_tinitaly'].agg(
Median_Diff_m='median',
NMAD_m=calculate_nmad,
Std_Dev_m='std',
Min_Diff_m='min',
Max_Diff_m='max'
).reset_index()
# Determine the plotting order based on the NMAD we just calculated
nmad_order = stats_for_plot.sort_values('NMAD_m')['LC_Label'].tolist()
# Define an anchor point for the text annotations
q_high = plot_df_L3_filtered['diff_tinitaly'].quantile(0.99) + 5
# =============================================================================
# 2. GENERATE VIOLIN PLOT (Level 3 - Detailed Performance)
# =============================================================================
plt.figure(figsize=(16, 9), facecolor='white')
# Use violin plot, ordering it with the 'nmad_order' list we just created
sns.violinplot(
x='diff_tinitaly',
y='LC_Label',
data=plot_df_L3_filtered,
order=nmad_order, # Use the calculated order here
inner='quartile',
palette='Spectral_r',
orient='h',
linewidth=1.0,
cut=0
)
# Add a vertical line at zero error
plt.axvline(0, color='red', linestyle='--', linewidth=1.5, alpha=0.7)
# Set labels and title
plt.title('Distribution of SAOCOM Height Residuals by Land Cover (CLC Level 3)', fontweight='bold', fontsize=18)
plt.xlabel('Height Residual (Calibrated SAOCOM - TINITALY DEM) [m]', fontsize=14)
plt.ylabel('CORINE Land Cover Class (Ordered by NMAD)', fontsize=14)
plt.grid(axis='x', linestyle='--', alpha=0.5)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
# --- Add Annotations using the stats we just generated ---
for i, label in enumerate(nmad_order):
# Query the 'stats_for_plot' DataFrame we created above
stats = stats_for_plot.query("LC_Label == @label").iloc[0]
stats_text = (
f"Median: {stats['Median_Diff_m']:+.2f} m\n"
f"NMAD: {stats['NMAD_m']:.2f} m\n"
f"Std Dev: {stats['Std_Dev_m']:.2f} m\n"
f"Min/Max: [{stats['Min_Diff_m']:.1f}, {stats['Max_Diff_m']:.1f}] m"
)
plt.text(q_high, i, stats_text,
verticalalignment='center',
horizontalalignment='left',
fontsize=10,
bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.8)
)
plt.tight_layout(rect=[0, 0, 0.8, 1])
plt.show()
print("Violin Plot for Level 3 Land Cover classes generated successfully.")
# Add this line to save the figure
plt.savefig(RESULTS_DIR / 'saocom_tinitaly_residuals_by_landcover.png', dpi=300, bbox_inches='tight')
plt.show()
# =============================================================================
# COPERNICUS STATISTICS AND VIOLIN PLOT (Level 3)
# =============================================================================
print("Generating statistics for Copernicus plot...")
def calculate_nmad(series):
"""Calculates the Normalized Median Absolute Deviation (NMAD)."""
return (series - series.median()).abs().median() * 1.4826
# Create a new stats DataFrame by grouping the filtered Copernicus plotting data
stats_for_cop_plot = plot_df_cop_filtered.groupby('LC_Label')['diff_copernicus'].agg(
Median_Diff_m='median',
NMAD_m=calculate_nmad,
Std_Dev_m='std',
Min_Diff_m='min',
Max_Diff_m='max'
).reset_index()
# Determine the plotting order based on the NMAD we just calculated
nmad_order_cop = stats_for_cop_plot.sort_values('NMAD_m')['LC_Label'].tolist()
# Define an anchor point for the text annotations
q_high_cop = plot_df_cop_filtered['diff_copernicus'].quantile(0.995) + 5
# =============================================================================
# 2. GENERATE COPERNICUS VIOLIN PLOT (Level 3)
# =============================================================================
plt.figure(figsize=(16, 9), facecolor='white')
# Generate the violin plot, ordering it with the 'nmad_order_cop' list
sns.violinplot(
x='diff_copernicus',
y='LC_Label',
data=plot_df_cop_filtered,
order=nmad_order_cop, # Use the calculated order here
inner='quartile',
palette='Spectral_r',
orient='h',
linewidth=1.0,
cut=0
)
# Add a vertical line at zero error
plt.axvline(0, color='red', linestyle='--', linewidth=1.5, alpha=0.7)
# Set labels and title
plt.title('Distribution of SAOCOM Height Residuals by Land Cover (vs. Copernicus)', fontweight='bold', fontsize=18)
plt.xlabel('Height Residual (Calibrated SAOCOM - Copernicus DEM) [m]', fontsize=14)
plt.ylabel('CORINE Land Cover Class (Ordered by NMAD)', fontsize=14)
plt.grid(axis='x', linestyle='--', alpha=0.5)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
# --- Add Annotations using the stats we just generated ---
for i, label in enumerate(nmad_order_cop):
# Query the 'stats_for_cop_plot' DataFrame
stats = stats_for_cop_plot.query("LC_Label == @label").iloc[0]
stats_text = (
f"Median: {stats['Median_Diff_m']:+.2f} m\n"
f"NMAD: {stats['NMAD_m']:.2f} m\n"
f"Std Dev: {stats['Std_Dev_m']:.2f} m\n"
f"Min/Max: [{stats['Min_Diff_m']:.1f}, {stats['Max_Diff_m']:.1f}] m"
)
plt.text(q_high_cop, i, stats_text,
verticalalignment='center',
horizontalalignment='left',
fontsize=10,
bbox=dict(boxstyle='round,pad=0.4', facecolor='white', alpha=0.8)
)
plt.tight_layout(rect=[0, 0, 0.8, 1])
plt.show()
print("Violin Plot for Copernicus comparison generated successfully.")
# Add this line to save the figure
plt.savefig(RESULTS_DIR / 'saocom_copernicus_residuals_by_landcover.png', dpi=300, bbox_inches='tight')
plt.show()
# =============================================================================
# 3. GENERATE BOX PLOT (Level 1 - Broad Comparison)
# =============================================================================
plt.figure(figsize=(10, 6), facecolor='white')
# Use box plot for a cleaner Level 1 aggregation
sns.boxplot(
x='LC_Level_1',
y='diff_tinitaly',
data=plot_df_L3_filtered,
palette='Set2',
linewidth=1.0,
showfliers=False # Do not show outliers already filtered
)
# Add a horizontal line at zero error
plt.axhline(0, color='red', linestyle='--', linewidth=1.5, alpha=0.7)
# Set labels and title
plt.title(
'SAOCOM Height Residuals by Land Cover (CLC Level 1)',
fontweight='bold',
fontsize=14
)
plt.xlabel('CORINE Land Cover Category (Level 1)', fontsize=11)
plt.ylabel('Height Residual (m)', fontsize=11)
plt.xticks(rotation=15, ha='right')
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()
print("Box Plot for Level 1 Land Cover categories generated successfully.")
# =============================================================================
# CORINE LAND COVER VISUALIZATION
# =============================================================================
fig, ax = plt.subplots(figsize=(14, 10), facecolor='white')
ax.set_facecolor('white')
# Get extent
extent = [xmin_grid, xmax_grid, ymin_grid, ymax_grid]
# Mask NoData/zero values for transparency
corine_display = np.ma.masked_where((corine_10m == 0) | (corine_10m == 255), corine_10m)
# Get unique classes in study area
unique_codes = np.unique(corine_display.compressed())
# Create colormap for present classes only
colors_list = [CORINE_COLORS_MPL.get(code, (0.5, 0.5, 0.5)) for code in unique_codes]
cmap = ListedColormap(colors_list)
norm = BoundaryNorm(boundaries=np.append(unique_codes, unique_codes[-1]+1) - 0.5,
ncolors=len(unique_codes))
# Plot CORINE
im = ax.imshow(corine_display, cmap=cmap, norm=norm, origin='upper', extent=extent)
# Add study area boundary
hull_gdf.boundary.plot(ax=ax, color='black', linewidth=2, label='Study Area')
# Labels and title
ax.set_xlabel('UTM Easting (m)', fontsize=14, color='black')
ax.set_ylabel('UTM Northing (m)', fontsize=14, color='black')
ax.set_title('CORINE Land Cover 2018', fontweight='bold', fontsize=18, color='black')
ax.tick_params(colors='black', labelsize=12)
ax.grid(True, alpha=0.3, linewidth=0.5, color='black')
# Create legend with only present classes
legend_elements = [plt.Rectangle((0,0),1,1, facecolor=CORINE_COLORS_MPL[code],
edgecolor='black', linewidth=0.5,
label=f"{code}: {CORINE_CLASSES.get(code, 'Unknown')}")
for code in sorted(unique_codes)]
ax.legend(handles=legend_elements, loc='center left', bbox_to_anchor=(1, 0.5),
fontsize=13, frameon=True, fancybox=False, edgecolor='black')
# Add scale bar
scalebar = ScaleBar(1, location='lower right', box_alpha=0.8, color='black')
ax.add_artist(scalebar)
# Add statistics box
total_area_km2 = np.sum(study_area_mask) * 0.0001
stats_text = f"Study Area: {total_area_km2:.2f} km²\nClasses: {len(unique_codes)}"
ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=12,
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white',
alpha=0.9, edgecolor='black'))
plt.tight_layout()
plt.show()
print(f"\nCORINE Land Cover Map:")
print(f" Total classes present: {len(unique_codes)}")
print(f" Study area: {total_area_km2:.2f} km²")
## =============================================================================
# SAOCOM HEIGHT RESIDUALS - INTERPOLATED HEAT MAPS
# =============================================================================
from scipy.interpolate import griddata
from scipy.ndimage import gaussian_filter
fig, axes = plt.subplots(1, 2, figsize=(22, 10), facecolor='white')
extent = [xmin_grid, xmax_grid, ymin_grid, ymax_grid]
# Filter valid points
valid_tin = saocom_gdf[saocom_gdf['diff_tinitaly'].notna()].copy()
valid_cop = saocom_gdf[saocom_gdf['diff_copernicus'].notna()].copy()
# Calculate symmetric color limits (95th percentile)
tin_limit = np.percentile(np.abs(valid_tin['diff_tinitaly']), 95)
cop_limit = np.percentile(np.abs(valid_cop['diff_copernicus']), 95)
common_limit = max(tin_limit, cop_limit)
# Create interpolation grid (matching the 10m grid)
xi = np.linspace(xmin_grid, xmax_grid, grid_width)
yi = np.linspace(ymax_grid, ymin_grid, grid_height)
xi_grid, yi_grid = np.meshgrid(xi, yi)
# =============================================================================
# Plot 1: SAOCOM - TINITALY Heat Map
# =============================================================================
ax = axes[0]
ax.set_facecolor('white')
# Background: Sentinel RGB (very faint)
ax.imshow(sentinel_rgb_norm, extent=extent, origin='upper', alpha=0.2)
# Extract coordinates and values
x_tin = valid_tin.geometry.x.values
y_tin = valid_tin.geometry.y.values
z_tin = valid_tin['diff_tinitaly'].values
# Interpolate to grid using cubic method
print("Interpolating TINITALY differences...")
zi_tin = griddata((x_tin, y_tin), z_tin, (xi_grid, yi_grid),
method='cubic', fill_value=np.nan)
# Apply Gaussian smoothing for smoother heat map
zi_tin_smooth = gaussian_filter(np.nan_to_num(zi_tin, nan=0), sigma=2)
zi_tin_smooth[~hull_mask] = np.nan # Mask to study area
# Plot heat map
im1 = ax.imshow(zi_tin_smooth, extent=extent, origin='upper',
cmap='RdBu_r', alpha=0.8,
vmin=-common_limit, vmax=common_limit,
interpolation='bilinear')
# Overlay original points (small, for reference)
ax.scatter(x_tin, y_tin, c=z_tin, cmap='RdBu_r',
s=0.5, alpha=0.3, edgecolors='none',
vmin=-common_limit, vmax=common_limit)
# Study area boundary
hull_gdf.boundary.plot(ax=ax, color='black', linewidth=2.5, linestyle='--')
# Colorbar
cbar1 = plt.colorbar(im1, ax=ax, label='Height Difference (m)',
shrink=0.8, pad=0.02)
cbar1.ax.tick_params(labelsize=10, colors='black')
cbar1.ax.yaxis.label.set_color('black')
ax.set_xlabel('UTM Easting (m)', fontsize=12, color='black')
ax.set_ylabel('UTM Northing (m)', fontsize=12, color='black')
ax.set_title('SAOCOM - TINITALY\nInterpolated Height Residual Heat Map',
fontweight='bold', fontsize=14, color='black')
ax.grid(True, alpha=0.3, color='black', linewidth=0.5)
ax.tick_params(colors='black')
# Statistics box
stats_text = f"""Points: {len(valid_tin):,}
Mean: {valid_tin['diff_tinitaly'].mean():+.2f} m
Median: {valid_tin['diff_tinitaly'].median():+.2f} m
RMSE: {np.sqrt((valid_tin['diff_tinitaly']**2).mean()):.2f} m
NMAD: {1.4826 * np.median(np.abs(valid_tin['diff_tinitaly'] - valid_tin['diff_tinitaly'].median())):.2f} m
Std: {valid_tin['diff_tinitaly'].std():.2f} m
Interpolation: Cubic + Gaussian
Color Scale: ±{common_limit:.1f} m
Red = SAOCOM Higher
Blue = TINITALY Higher"""
ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=9,
verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9,
edgecolor='black'))
# =============================================================================
# Plot 2: SAOCOM - Copernicus Heat Map
# =============================================================================
ax = axes[1]
ax.set_facecolor('white')
# Background: Sentinel RGB (very faint)
ax.imshow(sentinel_rgb_norm, extent=extent, origin='upper', alpha=0.2)
# Extract coordinates and values
x_cop = valid_cop.geometry.x.values
y_cop = valid_cop.geometry.y.values
z_cop = valid_cop['diff_copernicus'].values
# Interpolate to grid using cubic method
print("Interpolating Copernicus differences...")
zi_cop = griddata((x_cop, y_cop), z_cop, (xi_grid, yi_grid),
method='cubic', fill_value=np.nan)
# Apply Gaussian smoothing
zi_cop_smooth = gaussian_filter(np.nan_to_num(zi_cop, nan=0), sigma=2)
zi_cop_smooth[~hull_mask] = np.nan # Mask to study area
# Plot heat map
im2 = ax.imshow(zi_cop_smooth, extent=extent, origin='upper',
cmap='RdBu_r', alpha=0.8,
vmin=-common_limit, vmax=common_limit,
interpolation='bilinear')
# Overlay original points (small, for reference)
ax.scatter(x_cop, y_cop, c=z_cop, cmap='RdBu_r',
s=0.5, alpha=0.3, edgecolors='none',
vmin=-common_limit, vmax=common_limit)
# Study area boundary
hull_gdf.boundary.plot(ax=ax, color='black', linewidth=2.5, linestyle='--')
# Colorbar
cbar2 = plt.colorbar(im2, ax=ax, label='Height Difference (m)',
shrink=0.8, pad=0.02)
cbar2.ax.tick_params(labelsize=10, colors='black')
cbar2.ax.yaxis.label.set_color('black')
ax.set_xlabel('UTM Easting (m)', fontsize=12, color='black')
ax.set_ylabel('UTM Northing (m)', fontsize=12, color='black')
ax.set_title('SAOCOM - Copernicus\nInterpolated Height Residual Heat Map',
fontweight='bold', fontsize=14, color='black')
ax.grid(True, alpha=0.3, color='black', linewidth=0.5)
ax.tick_params(colors='black')
# Statistics box
stats_text = f"""Points: {len(valid_cop):,}
Mean: {valid_cop['diff_copernicus'].mean():+.2f} m
Median: {valid_cop['diff_copernicus'].median():+.2f} m
RMSE: {np.sqrt((valid_cop['diff_copernicus']**2).mean()):.2f} m
NMAD: {1.4826 * np.median(np.abs(valid_cop['diff_copernicus'] - valid_cop['diff_copernicus'].median())):.2f} m
Std: {valid_cop['diff_copernicus'].std():.2f} m
Interpolation: Cubic + Gaussian
Color Scale: ±{common_limit:.1f} m
Red = SAOCOM Higher
Blue = Copernicus Higher"""
ax.text(0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=9,
verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9,
edgecolor='black'))
plt.tight_layout()
plt.savefig(RESULTS_DIR / 'saocom_residual_heatmaps_interpolated.png',
dpi=300, bbox_inches='tight', facecolor='white')
plt.show()
print("\nSaved: saocom_residual_heatmaps_interpolated.png")
print(f"Interpolation method: Cubic spline + Gaussian smoothing (sigma=2)")
print(f"Color scale range: ±{common_limit:.2f} m")
Generating statistics just-in-time for the plot...
Violin Plot for Level 3 Land Cover classes generated successfully.
<Figure size 640x480 with 0 Axes>
Generating statistics for Copernicus plot...
Violin Plot for Copernicus comparison generated successfully.
<Figure size 640x480 with 0 Axes>
Box Plot for Level 1 Land Cover categories generated successfully.
CORINE Land Cover Map: Total classes present: 10 Study area: 49.54 km² Interpolating TINITALY differences... Interpolating Copernicus differences...
Saved: saocom_residual_heatmaps_interpolated.png Interpolation method: Cubic spline + Gaussian smoothing (sigma=2) Color scale range: ±9.55 m
Class Overlays Basic¶
INDIVIDUAL CLASS OVERLAY MAPS (COLORBLIND-FRIENDLY)¶
In [19]:
# =============================================================================
# INDIVIDUAL CLASS OVERLAY MAPS (COLORBLIND-FRIENDLY)
# =============================================================================
from matplotlib.patches import Patch
# Get unique classes present in data
unique_classes = np.unique(corine_10m[corine_10m > 0])
# Create one map per class
for lc_code in sorted(unique_classes):
fig, ax = plt.subplots(1, 1, figsize=(14, 10), facecolor='white')
ax.set_facecolor('white')
# Display Sentinel RGB as background
ax.imshow(sentinel_rgb_norm, extent=[xmin_grid, xmax_grid, ymin_grid, ymax_grid],
origin='upper', alpha=0.7) # Slight transparency to help overlay show
# Get color for this land cover class
fill_color = tuple(c/255 for c in CORINE_COLORS.get(lc_code, (128, 128, 128)))
# Create mask for this land cover class
lc_mask = (corine_10m == lc_code)
# Vectorize to get boundaries
mask_shapes = shapes(lc_mask.astype(np.uint8), mask=lc_mask, transform=target_transform)
# Convert to polygons and plot
polys = [shape(geom) for geom, val in mask_shapes if val == 1]
if polys:
for poly in polys:
if poly.is_valid:
x, y = poly.exterior.xy
# Fill with class-specific color + hatching for visibility
ax.fill(x, y, color=fill_color, alpha=0.4,
edgecolor='none', hatch='///', linewidth=0)
# Bold black outline for definition
ax.plot(x, y, color='black', linewidth=2.5, alpha=0.9)
# Colored inner outline
ax.plot(x, y, color=fill_color, linewidth=1.5, alpha=1.0)
# Add study area boundary
hull_gdf.boundary.plot(ax=ax, color='black', linewidth=3, linestyle='--', alpha=0.8)
hull_gdf.boundary.plot(ax=ax, color='red', linewidth=1.5, linestyle='--', alpha=1.0)
# Calculate statistics
lc_count = np.sum(lc_mask)
area_km2 = lc_count * (GRID_SIZE**2) / 1e6
pct_area = 100 * lc_count / np.sum(corine_10m > 0)
# Title with statistics
class_name = CORINE_CLASSES.get(lc_code, f'Class {lc_code}')
ax.set_title(f'Land Cover: {class_name}\n'
f'Code {lc_code} | Area: {area_km2:.1f} km² ({pct_area:.1f}%)',
fontweight='bold', fontsize=13, pad=15)
ax.set_xlabel('UTM Easting (m)', fontsize=11)
ax.set_ylabel('UTM Northing (m)', fontsize=11)
ax.grid(True, alpha=0.3, color='gray', linewidth=0.5)
# Legend with hatching
legend_elements = [
Patch(facecolor=fill_color, edgecolor='black', linewidth=2,
alpha=0.4, hatch='///', label=class_name),
Patch(facecolor='none', edgecolor='red', linestyle='--',
linewidth=2, label='Study Area')
]
ax.legend(handles=legend_elements, loc='upper right', fontsize=10,
frameon=True, fancybox=False, edgecolor='black')
plt.tight_layout()
# Save
safe_name = class_name.replace(' ', '_').replace(',', '').replace('/', '_')
filename = f'landcover_{lc_code}_{safe_name}.png'
plt.savefig(RESULTS_DIR / filename, dpi=300, bbox_inches='tight', facecolor='white')
plt.show()
plt.close()
print(f"Saved: {filename}")
print(f"\nGenerated {len(unique_classes)} individual land cover overlay maps")
Saved: landcover_112_Discontinuous_urban_fabric.png
Saved: landcover_221_Vineyards.png
Saved: landcover_223_Olive_groves.png
Saved: landcover_231_Pastures.png
Saved: landcover_242_Complex_cultivation_patterns.png
Saved: landcover_243_Agriculture_natural_vegetation_mix.png